feat: added new packages

This commit is contained in:
Andrew Rioux
2025-02-01 15:59:56 -05:00
parent f5afd60086
commit b416f35b63
28 changed files with 425 additions and 358 deletions

16
sparse-handler/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "sparse-handler"
edition = "2024"
version.workspace = true
[dependencies]
axum = { version = "^0.7", features = ["ws", "macros"] }
tokio = { version = "1", features = ["rt-multi-thread", "signal"] }
anyhow = "1.0"
futures = "0.3"
tokio-stream = "0.1"
tracing = "0.1"
sqlx = { version = "0.8", default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "sqlx-sqlite"] }
serde = "1.0"
serde_json = "1.0"
axum-server = { version = "^0.7", features = ["tokio-rustls", "tls-rustls"] }

View File

@@ -0,0 +1,63 @@
#[derive(Debug)]
pub enum Error {
Generic(String),
Sqlx(sqlx::Error),
TokioJoin(tokio::task::JoinError),
Io(std::io::Error),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Generic(err) => {
write!(f, "generic error: {err}")
}
Error::Sqlx(err) => {
write!(f, "sqlx error: {err:?}")
}
Error::TokioJoin(err) => {
write!(f, "tokio join error: {err:?}")
}
Error::Io(err) => {
write!(f, "io error: {err:?}")
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Sqlx(err) => Some(err),
Error::TokioJoin(err) => Some(err),
Error::Io(err) => Some(err),
_ => None,
}
}
}
impl std::str::FromStr for Error {
type Err = Self;
fn from_str(err: &str) -> Result<Self, Self::Err> {
Ok(Self::Generic(err.to_string()))
}
}
impl From<sqlx::Error> for Error {
fn from(err: sqlx::Error) -> Self {
Self::Sqlx(err)
}
}
impl From<tokio::task::JoinError> for Error {
fn from(err: tokio::task::JoinError) -> Self {
Self::TokioJoin(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}

115
sparse-handler/src/lib.rs Normal file
View File

@@ -0,0 +1,115 @@
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use axum::routing::{get, post, Router};
use axum_server::tls_rustls::RustlsConfig;
use sqlx::SqlitePool;
use tokio::task::JoinHandle;
pub mod error;
pub struct BeaconListenerHandle {
join_handle: JoinHandle<()>
}
impl BeaconListenerHandle {
pub fn is_finished(&self) -> bool {
self.join_handle.is_finished()
}
pub fn abort(&self) {
self.join_handle.abort()
}
}
#[derive(Clone, Default)]
pub struct BeaconListenerMap(Arc<RwLock<HashMap<i64, BeaconListenerHandle>>>);
impl std::ops::Deref for BeaconListenerMap {
type Target = Arc<RwLock<HashMap<i64, BeaconListenerHandle>>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub async fn start_all_listeners(beacon_listener_map: BeaconListenerMap, db: SqlitePool) -> Result<(), crate::error::Error> {
tracing::debug!("Typeid: {:?}", std::any::TypeId::of::<BeaconListenerMap>());
let listener_ids = sqlx::query!("SELECT listener_id FROM beacon_listener")
.fetch_all(&db)
.await?;
tracing::info!("Starting {} listener(s)...", listener_ids.len());
for listener in listener_ids {
start_listener(beacon_listener_map.clone(), listener.listener_id, db.clone()).await?;
}
Ok(())
}
#[derive(Clone)]
struct ListenerState {
db: SqlitePool
}
pub async fn start_listener(beacon_listener_map: BeaconListenerMap, listener_id: i64, db: SqlitePool) -> Result<(), crate::error::Error> {
{
let Ok(blm_handle) = beacon_listener_map.read() else {
return Err(crate::error::Error::Generic("Could not acquire write lock on beacon listener map".to_string()));
};
if blm_handle.get(&listener_id).is_some() {
return Err(crate::error::Error::Generic("Beacon listener already started".to_string()));
}
}
let listener = sqlx::query!("SELECT * FROM beacon_listener WHERE listener_id = ?", listener_id)
.fetch_one(&db)
.await?;
let app: Router<()> = Router::new()
.route("/register_beacon", post(|| async {
tracing::info!("Beacon attempting to register");
}))
.route("/test", get(|| async {
tracing::info!("Hello");
"hi there"
}))
.with_state(ListenerState {
db
});
let hidden_app = Router::new().nest("/hidden_sparse", app);
let tls_config = RustlsConfig::from_pem(
listener.certificate.as_bytes().to_vec(),
listener.privkey.as_bytes().to_vec()
).await?;
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], listener.port as u16));
tracing::debug!("Starting listener {}, {}, on port {}", listener_id, listener.domain_name, listener.port);
let join_handle = tokio::task::spawn(async move {
let res = axum_server::tls_rustls::bind_rustls(addr, tls_config)
.serve(hidden_app.into_make_service())
.await;
if let Err(e) = res {
tracing::error!("error running sparse listener: {e:?}");
}
});
let Ok(mut blm_handle) = beacon_listener_map.write() else {
return Err(crate::error::Error::Generic("Could not acquire write lock on beacon listener map".to_string()));
};
blm_handle.insert(listener_id, BeaconListenerHandle {
join_handle
});
Ok(())
}