feat: got timeline for beacons working

This commit is contained in:
Andrew Rioux
2025-02-24 15:10:51 -05:00
parent a57a95a98a
commit 5eb66c8d5d
11 changed files with 653 additions and 53 deletions

View File

@@ -149,6 +149,9 @@ pub async fn issue_command(
)
.execute(&db)
.await?;
let update_notifier = expect_context::<tokio::sync::broadcast::Sender::<sparse_handler::BeaconEvent>>();
update_notifier.send(sparse_handler::BeaconEvent::CommandIssued(bid, command_id))?;
}
Target::Category(cid) => {
let version = command_builder.required_version();
@@ -168,6 +171,18 @@ pub async fn issue_command(
)
.execute(&db)
.await?;
let beacon_ids = sqlx::query!(
"SELECT beacon_id FROM beacon_command_invocation WHERE command_id = ?",
command_id
)
.fetch_all(&db)
.await?;
for rec in beacon_ids {
let update_notifier = expect_context::<tokio::sync::broadcast::Sender::<sparse_handler::BeaconEvent>>();
update_notifier.send(sparse_handler::BeaconEvent::CommandIssued(rec.beacon_id, command_id))?;
}
}
}

View File

@@ -1,21 +1,63 @@
use leptos::{either::Either, prelude::*};
use std::collections::VecDeque;
use leptos::{either::{Either, EitherOf5}, prelude::*};
#[cfg(not(feature = "ssr"))]
use leptos_use::{use_websocket, use_infinite_scroll};
use leptos_router::params::Params;
use serde::{Serialize, Deserialize};
#[cfg(feature = "ssr")]
use {crate::db::user, leptos::server_fn::error::NoCustomError};
use sparse_actions::version::Version;
use sparse_actions::{actions::{Action, Actions}, version::Version};
#[cfg_attr(feature = "ssr", allow(dead_code))]
const ITEM_LOAD_AMOUNT: usize = 25;
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub enum InvokeOrResult {
Invoke,
Result
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LineItem {
CommandInvocation {
user_name: String,
invoke_date: chrono::DateTime<chrono::Utc>,
command_id: i64,
action: Actions
},
CommandResult {
user_name: String,
result_date: chrono::DateTime<chrono::Utc>,
command_id: i64,
action: Actions,
action_result: String
},
Checkin(chrono::DateTime<chrono::Utc>)
}
impl LineItem {
fn date(&self) -> chrono::DateTime<chrono::Utc> {
match self {
Self::CommandInvocation { invoke_date, .. } => invoke_date,
Self::CommandResult { result_date, .. } => result_date,
Self::Checkin(r) => r
}.clone()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BeaconViewEvent {
NewEvent(LineItem),
LoadHistorical(Vec<Result<LineItem, InvokeOrResult>>),
BeaconUpdate,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BeaconClientMessage {
LoadHistorical(i64)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -191,8 +233,85 @@ pub fn InstancesView() -> impl IntoView {
..
} = expect_context();
let display_checkins = RwSignal::new(true);
#[cfg_attr(feature = "ssr", allow(unused_variables))]
let (line_items, update_line_items) = signal(VecDeque::<Result<LineItem, InvokeOrResult>>::new());
let scroll_ref = NodeRef::<leptos::html::Div>::new();
#[cfg_attr(feature = "ssr", allow(unused_variables))]
let (done_with_scrolling, set_done_with_scrolling) = signal(false);
#[cfg(not(feature = "ssr"))]
let (web_socket, rebuild_websocket) = signal(use_websocket::<
BeaconClientMessage,
BeaconViewEvent,
codee::string::JsonSerdeCodec,
>("/api/subscribe/listener"));
#[cfg(not(feature = "ssr"))]
(web_socket.get_untracked().send)(&BeaconClientMessage::LoadHistorical(0));
#[cfg(not(feature = "ssr"))]
Effect::new(move |_| {
let user = expect_context::<ReadSignal<Option<crate::users::User>>>();
user.with(move |_| {
rebuild_websocket(use_websocket::<
BeaconClientMessage,
BeaconViewEvent,
codee::string::JsonSerdeCodec,
>(&format!(
"/api/subscribe/beacon/{}",
instance_id.get().expect("could not extract ID from URL").id)
));
});
});
#[cfg(not(feature = "ssr"))]
Effect::new(move |_| {
web_socket.with(move |uwsr| {
uwsr.message.with(move |message| {
let Some(m) = message else {
return;
};
match m {
BeaconViewEvent::NewEvent(li) => {
update_line_items.update(|lis| {
lis.push_front(Ok(li.clone()));
})
}
BeaconViewEvent::LoadHistorical(historical) => {
if historical.len() < ITEM_LOAD_AMOUNT {
set_done_with_scrolling(true);
}
update_line_items.update(|lis| {
lis.extend(historical.into_iter().map(Clone::clone))
})
}
BeaconViewEvent::BeaconUpdate => {
instance.refetch();
}
}
})
})
});
#[cfg(not(feature = "ssr"))]
let _ = use_infinite_scroll(
scroll_ref,
move |_| async move {
if done_with_scrolling.get() {
return;
}
let ws = web_socket.get();
let line_items_len = line_items.read().len();
(ws.send)(&BeaconClientMessage::LoadHistorical(line_items_len as i64));
}
);
view! {
<div class="instance">
<div class="instance" node_ref=scroll_ref>
<Suspense fallback=|| view! {"Loading..."}>
{move || Suspend::new(async move {
let configs = match configs.await {
@@ -249,7 +368,87 @@ pub fn InstancesView() -> impl IntoView {
/>
})
})}
<fieldset>
<legend>"Beacon history options"</legend>
<label>"Show checkins"</label>
<input type="checkbox" bind:value=display_checkins checked />
</fieldset>
</Suspense>
<div class="instance-line-items">
<For
each=move || line_items
.get()
.into_iter()
.filter(move |res| match &res {
Ok(LineItem::Checkin(_)) => display_checkins.get(),
_ => true
})
key=|lineitem| lineitem
.as_ref()
.map(move |li| li.date())
.map_err(Clone::clone)
let:lineitem
>
{match lineitem {
Ok(LineItem::Checkin(date)) => EitherOf5::A(view! {
<div class="instance-hist-beacon instance-hist-bodiless">
"Beacon checked in at " {format!("{}", date)}
</div>
}),
Ok(LineItem::CommandResult {
result_date,
action,
action_result,
command_id,
..
}) => EitherOf5::B(view! {
<div class="instance-hist-beacon">
<div class="instance-hist-header">
{format!("command {command_id} finished executing at {result_date}")}<br/>
</div>
<div class="instance-hist-body">
{action.render_data(action_result)}
</div>
</div>
}),
Ok(LineItem::CommandInvocation {
user_name,
invoke_date,
action,
command_id
}) => EitherOf5::C(view! {
<div class="instance-hist-user">
<div class="instance-hist-header">
{user_name.clone()} ": " {format!("command {command_id} issued at {invoke_date}")}<br />
</div>
<div class="instance-hist-body">
{action.render_empty()}
</div>
</div>
}),
Err(InvokeOrResult::Result) => EitherOf5::D(view! {
<div class="instance-hist-beacon instance-hist-bodiless">
"There was an issue decoding the command result in the database"
</div>
}),
Err(InvokeOrResult::Invoke) => EitherOf5::E(view! {
<div class="instance-hist-user instance-hist-bodiless">
"There was an issue decoding the command sent in the database"
</div>
})
}}
</For>
{move || done_with_scrolling.get().then(|| view! {
<div class="instance-hist-end">
<div class="instance-hist-end-inner">
"No more history entries exist for this beacon"
</div>
</div>
})}
</div>
</div>
}
}

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use leptos::{either::Either, prelude::*};
use leptos_router::components::A;
#[cfg(feature = "hydrate")]
#[cfg(not(feature = "ssr"))]
use leptos_use::use_websocket;
use serde::{Deserialize, Serialize};
@@ -599,30 +599,28 @@ pub fn BeaconSidebar() -> impl IntoView {
<div class="beacon-instance-vers">
<span>"Version: "</span> {beacon.version.to_string()}
</div>
{(sort_method.get() != Some(SortMethod::Category))
.then(|| -> Vec<String> {
let BeaconResources {
categories,
..
} = expect_context::<BeaconResources>();
{Some((|| {
let BeaconResources {
categories,
..
} = expect_context::<BeaconResources>();
let Some(Ok(ref categories)) = *categories.read() else {
return vec![];
};
let Some(Ok(ref categories)) = *categories.read() else {
return vec![];
};
categories
.iter()
.filter(|cat| beacon.category_ids.contains(&cat.category_id))
.map(|cat| cat.category_name.clone())
.collect()
})
categories
.iter()
.filter(|cat| beacon.category_ids.contains(&cat.category_id))
.map(|cat| cat.category_name.clone())
.collect::<Vec<_>>()
})())
.filter(|categories| !categories.is_empty())
.map(|categories| view! {
<div class="beacon-instance-categories">
<span>"Categories: "</span> {categories.join(", ")}
</div>
})
}
})}
{(sort_method.get() != Some(SortMethod::Template))
.then(|| -> Option<String> {
let BeaconResources {

View File

@@ -532,6 +532,7 @@ async fn handle_listener_events(
pub async fn attach_to_beacon(
State(state): State<AppState>,
cookie_jar: CookieJar,
Path(beacon_id): Path<String>,
ws: ws::WebSocketUpgrade
) -> axum::response::Response {
let user = match crate::db::user::get_auth_session_inner(
@@ -551,7 +552,7 @@ pub async fn attach_to_beacon(
ws
.on_upgrade(move |socket: ws::WebSocket| async move {
if let Err(e) = handle_beacon_socket(socket, state).await {
if let Err(e) = handle_beacon_socket(beacon_id, socket, state).await {
tracing::warn!("Encountered error when handling beacon subscriber: {e}");
};
})
@@ -559,12 +560,237 @@ pub async fn attach_to_beacon(
}
async fn handle_beacon_socket(
beacon_id: String,
mut socket: ws::WebSocket,
state: AppState,
) -> Result<(), crate::error::Error> {
unimplemented!()
}
let mut event_receiver = state.beacon_event_broadcast.subscribe();
use crate::beacons::instances::{BeaconClientMessage, BeaconViewEvent, InvokeOrResult, LineItem};
macro_rules! send_event {
($ev:expr) => {{
let ev = $ev;
let json = serde_json::to_string(&ev)?;
socket.send(ws::Message::Text(json)).await?;
}}
}
loop {
use sparse_handler::BeaconEvent;
tokio::select! {
event = event_receiver.recv() => match event {
Ok(BeaconEvent::BeaconCommandFinished(bid, cid)) if bid == beacon_id => {
struct InvokeInfo {
params: String,
date: chrono::DateTime<chrono::Utc>,
res: String,
user_name: String
}
type DateTime = chrono::DateTime<chrono::Utc>;
// Null safety: the only time the above event is published is after
// the database has updated the two values
// It would maybe wise to update the event to instead pass the results...
let invoke_info = sqlx::query_as!(
InvokeInfo,
r#"SELECT cmd.cmd_parameters as params, bci.invocation_result as "res!",
bci.invocation_date as "date!: DateTime", users.user_name as user_name
FROM beacon_command_invocation bci
INNER JOIN users ON users.user_id = bci.invoker_id
INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
WHERE bci.command_id = ? AND bci.beacon_id = ?"#,
cid,
bid
)
.fetch_one(&state.db)
.await?;
send_event!(BeaconViewEvent::NewEvent(LineItem::CommandResult {
user_name: invoke_info.user_name,
result_date: invoke_info.date,
command_id: cid,
action: serde_json::from_str(&invoke_info.params)?,
action_result: invoke_info.res
}));
}
Ok(BeaconEvent::BeaconUpdate(bid)) if bid == beacon_id => {
send_event!(BeaconViewEvent::BeaconUpdate);
}
Ok(BeaconEvent::Checkin(bid)) if bid == beacon_id => {
send_event!(BeaconViewEvent::NewEvent(LineItem::Checkin(chrono::Utc::now())));
}
Ok(BeaconEvent::CommandIssued(bid, cid)) if bid == beacon_id => {
struct InvokeInfo {
params: String,
date: chrono::DateTime<chrono::Utc>,
user_name: String
}
type DateTime = chrono::DateTime<chrono::Utc>;
// Null safety: the only time the above event is published is after
// the database has updated the two values
// It would maybe wise to update the event to instead pass the results...
let invoke_info = sqlx::query_as!(
InvokeInfo,
r#"SELECT cmd.cmd_parameters as params, bci.issue_date as "date: DateTime",
users.user_name as user_name
FROM beacon_command_invocation bci
INNER JOIN users ON users.user_id = bci.invoker_id
INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
WHERE bci.command_id = ? AND bci.beacon_id = ?"#,
cid,
bid
)
.fetch_one(&state.db)
.await?;
send_event!(BeaconViewEvent::NewEvent(LineItem::CommandInvocation {
user_name: invoke_info.user_name,
invoke_date: invoke_info.date,
command_id: cid,
action: serde_json::from_str(&invoke_info.params)?,
}));
}
Ok(_) => {
// not for us
}
Err(e) => {
tracing::warn!("Unable to handle general event: {e:?}");
}
},
user_event = socket.recv() => match user_event {
Some(Ok(ws::Message::Text(ev))) => {
let Ok(msg) = serde_json::from_str::<BeaconClientMessage>(&ev) else {
continue;
};
let BeaconClientMessage::LoadHistorical(offset) = msg;
#[derive(Debug)]
enum DbLineItem {
CommandInvocation {
user_name: String,
invoke_date: chrono::DateTime<chrono::Utc>,
command_id: i64,
action: String
},
CommandResult {
user_name: String,
result_date: chrono::DateTime<chrono::Utc>,
command_id: i64,
action: String,
action_result: String
},
Checkin(chrono::DateTime<chrono::Utc>)
}
use sqlx::{FromRow, sqlite::SqliteRow, Row};
impl FromRow<'_, SqliteRow> for DbLineItem {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
match row.try_get("item")? {
"checkin" => Ok(Self::Checkin(
row.try_get("item_date")?
)),
"invoke" => Ok(Self::CommandInvocation {
invoke_date: row.try_get("item_date")?,
user_name: row.try_get(2)?,
command_id: row.try_get(3)?,
action: row.try_get(4)?
}),
"result" => Ok(Self::CommandResult {
result_date: row.try_get("item_date")?,
user_name: row.try_get(5)?,
command_id: row.try_get(6)?,
action: row.try_get(7)?,
action_result: row.try_get(8)?
}),
type_name => Err(sqlx::Error::TypeNotFound {
type_name: type_name.to_string(),
}),
}
}
}
let info: Vec<DbLineItem> = sqlx::query_as(
r#"SELECT 'checkin' as item, checkin_date as item_date,
NULL, NULL, NULL,
NULL, NULL, NULL, NULL
FROM beacon_checkin WHERE beacon_id = ?
UNION
SELECT 'invoke' as item, bci.issue_date as item_date,
users.user_name, bci.command_id, cmd.cmd_parameters,
NULL, NULL, NULL, NULL
FROM beacon_command_invocation bci
INNER JOIN users ON users.user_id = bci.invoker_id
INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
WHERE bci.beacon_id = ?
UNION
SELECT 'result' as item, bci.invocation_date as item_date,
NULL, NULL, NULL,
users.user_name, bci.command_id, cmd.cmd_parameters, bci.invocation_result
FROM beacon_command_invocation bci
INNER JOIN users ON users.user_id = bci.invoker_id
INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
WHERE bci.beacon_id = ? AND bci.invocation_result IS NOT NULL
ORDER BY item_date DESC
LIMIT ?, 25"#,
)
.bind(&beacon_id)
.bind(&beacon_id)
.bind(&beacon_id)
.bind(offset)
.fetch_all(&state.db)
.await?;
let info = info
.into_iter()
.map(|r| match r {
DbLineItem::CommandInvocation {
user_name,
invoke_date,
command_id,
action
} => serde_json::from_str(&action)
.map_err(|_| InvokeOrResult::Invoke)
.map(|action| LineItem::CommandInvocation {
user_name,
invoke_date,
command_id,
action
}),
DbLineItem::CommandResult {
user_name,
result_date,
command_id,
action,
action_result
} => serde_json::from_str(&action)
.map_err(|_| InvokeOrResult::Result)
.map(|action| LineItem::CommandResult {
user_name,
result_date,
command_id,
action,
action_result
}),
DbLineItem::Checkin(d) => Ok(LineItem::Checkin(d)),
})
.collect();
send_event!(BeaconViewEvent::LoadHistorical(info));
}
Some(Ok(ws::Message::Close(_))) => {
break Ok(());
}
_ => {}
}
}
}
}
pub async fn serve_web(
management_address: SocketAddrV4,
@@ -608,6 +834,10 @@ pub async fn serve_web(
"/api/subscribe/listener",
axum::routing::any(subscribe_to_listener_events)
)
.route(
"/api/subscribe/beacon/:beacon_id",
axum::routing::any(attach_to_beacon)
)
.route("/api/*fn_name", post(leptos_axum::handle_server_fns))
.leptos_routes_with_context(
&state,