feat: added the ability to start new listeners
This commit is contained in:
@@ -3,8 +3,97 @@ use std::{
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use axum::routing::{get, post, Router};
|
||||
use axum_server::{service::MakeService, tls_rustls::RustlsConfig};
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub type BeaconListenerHandle = JoinHandle<()>;
|
||||
pub struct BeaconListenerHandle {
|
||||
join_handle: JoinHandle<()>
|
||||
}
|
||||
|
||||
impl BeaconListenerHandle {
|
||||
pub fn is_finished(&self) -> bool {
|
||||
self.join_handle.is_finished()
|
||||
}
|
||||
|
||||
pub fn abort(&self) {
|
||||
self.join_handle.abort()
|
||||
}
|
||||
}
|
||||
|
||||
pub type BeaconListenerMap = Arc<RwLock<HashMap<i64, BeaconListenerHandle>>>;
|
||||
|
||||
pub async fn start_all_listeners(beacon_listener_map: BeaconListenerMap, db: SqlitePool) -> Result<(), crate::error::Error> {
|
||||
let listener_ids = sqlx::query!("SELECT listener_id FROM beacon_listener")
|
||||
.fetch_all(&db)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Starting {} listener(s)...", listener_ids.len());
|
||||
|
||||
for listener in listener_ids {
|
||||
start_listener(Arc::clone(&beacon_listener_map), listener.listener_id, db.clone()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ListenerState {
|
||||
db: SqlitePool
|
||||
}
|
||||
|
||||
pub async fn start_listener(beacon_listener_map: BeaconListenerMap, listener_id: i64, db: SqlitePool) -> Result<(), crate::error::Error> {
|
||||
{
|
||||
let Ok(blm_handle) = beacon_listener_map.read() else {
|
||||
return Err(crate::error::Error::Generic("Could not acquire write lock on beacon listener map".to_string()));
|
||||
};
|
||||
|
||||
if blm_handle.get(&listener_id).is_some() {
|
||||
return Err(crate::error::Error::Generic("Beacon listener already started".to_string()));
|
||||
}
|
||||
}
|
||||
let listener = sqlx::query!("SELECT * FROM beacon_listener WHERE listener_id = ?", listener_id)
|
||||
.fetch_one(&db)
|
||||
.await?;
|
||||
|
||||
let app: Router<()> = Router::new()
|
||||
.route("/register_beacon", post(|| async {}))
|
||||
.route("/test", get(|| async {
|
||||
"hi there"
|
||||
}))
|
||||
.with_state(ListenerState {
|
||||
db
|
||||
});
|
||||
|
||||
let hidden_app = Router::new().nest("/hidden_sparse", app);
|
||||
|
||||
let tls_config = RustlsConfig::from_pem(
|
||||
listener.certificate.as_bytes().to_vec(),
|
||||
listener.privkey.as_bytes().to_vec()
|
||||
).await?;
|
||||
|
||||
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], listener.port as u16));
|
||||
|
||||
tracing::debug!("Starting listener {}, {}, on port {}", listener_id, listener.domain_name, listener.port);
|
||||
|
||||
let join_handle = tokio::task::spawn(async move {
|
||||
let res = axum_server::tls_rustls::bind_rustls(addr, tls_config)
|
||||
.serve(hidden_app.into_make_service())
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::error!("error running sparse listener: {e:?}");
|
||||
}
|
||||
});
|
||||
|
||||
let Ok(mut blm_handle) = beacon_listener_map.write() else {
|
||||
return Err(crate::error::Error::Generic("Could not acquire write lock on beacon listener map".to_string()));
|
||||
};
|
||||
|
||||
blm_handle.insert(listener_id, BeaconListenerHandle {
|
||||
join_handle
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -111,7 +111,19 @@ pub async fn remove_listener(listener_id: i64) -> Result<(), ServerFnError> {
|
||||
|
||||
#[server]
|
||||
pub async fn start_listener(listener_id: i64) -> Result<(), ServerFnError> {
|
||||
unimplemented!()
|
||||
let user = user::get_auth_session().await?;
|
||||
|
||||
if user.is_none() {
|
||||
return Err(ServerFnError::<NoCustomError>::ServerError("You are not signed in!".to_owned()));
|
||||
}
|
||||
|
||||
crate::beacon_handler::start_listener(
|
||||
expect_context(),
|
||||
listener_id,
|
||||
expect_context()
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[component]
|
||||
@@ -193,7 +205,7 @@ fn DisplayListeners(listeners: Vec<PubListener>) -> impl IntoView {
|
||||
{listener.public_ip.clone()}
|
||||
":"
|
||||
{listener.port}
|
||||
")"
|
||||
") "
|
||||
{match listener.active {
|
||||
true => Either::Left(view! {
|
||||
<span>"active!"</span>
|
||||
|
||||
@@ -30,10 +30,6 @@ pub enum Command {
|
||||
/// Address to bind to for the management interface
|
||||
#[structopt(default_value = "127.0.0.1:3000")]
|
||||
management_address: SocketAddrV4,
|
||||
|
||||
/// Public address to bind to for the beacons to call back to
|
||||
#[structopt(default_value = "127.0.0.1:5000")]
|
||||
bind_address: SocketAddrV4,
|
||||
},
|
||||
|
||||
/// Extract the public key and print it to standard out
|
||||
|
||||
@@ -8,6 +8,8 @@ pub enum Error {
|
||||
TokioJoin(tokio::task::JoinError),
|
||||
#[cfg(feature = "ssr")]
|
||||
Pbkdf2(pbkdf2::password_hash::errors::Error),
|
||||
#[cfg(feature = "ssr")]
|
||||
Io(std::io::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
@@ -31,6 +33,10 @@ impl std::fmt::Display for Error {
|
||||
Error::Pbkdf2(err) => {
|
||||
write!(f, "password hash error: {err:?}")
|
||||
}
|
||||
#[cfg(feature = "ssr")]
|
||||
Error::Io(err) => {
|
||||
write!(f, "io error: {err:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -42,6 +48,8 @@ impl std::error::Error for Error {
|
||||
Error::Sqlx(err) => Some(err),
|
||||
#[cfg(feature = "ssr")]
|
||||
Error::TokioJoin(err) => Some(err),
|
||||
#[cfg(feature = "ssr")]
|
||||
Error::Io(err) => Some(err),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -75,3 +83,10 @@ impl From<pbkdf2::password_hash::errors::Error> for Error {
|
||||
Self::Pbkdf2(err)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ssr")]
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
Self::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,9 +79,9 @@ async fn main() -> anyhow::Result<std::process::ExitCode> {
|
||||
tracing::info!("Done running database migrations!");
|
||||
|
||||
match options.command.clone() {
|
||||
Some(cli::Command::Serve { management_address, bind_address }) => {
|
||||
Some(cli::Command::Serve { management_address }) => {
|
||||
tracing::info!("Performing requested action, acting as web server");
|
||||
webserver::serve_web(management_address, bind_address, pool).await
|
||||
webserver::serve_web(management_address, pool).await
|
||||
}
|
||||
Some(cli::Command::ExtractPubKey { }) => {
|
||||
Ok(ExitCode::SUCCESS)
|
||||
@@ -95,8 +95,7 @@ async fn main() -> anyhow::Result<std::process::ExitCode> {
|
||||
tracing::info!("Performing default action of acting as web server");
|
||||
|
||||
let default_management_ip = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 3000);
|
||||
let default_beacon_ip = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 5000);
|
||||
webserver::serve_web(default_management_ip, default_beacon_ip, pool).await
|
||||
webserver::serve_web(default_management_ip, pool).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use {
|
||||
crate::db::user
|
||||
};
|
||||
|
||||
fn format_delta(time: chrono::TimeDelta) -> String {
|
||||
pub fn format_delta(time: chrono::TimeDelta) -> String {
|
||||
let seconds = time.num_seconds();
|
||||
|
||||
match seconds {
|
||||
|
||||
@@ -8,7 +8,7 @@ use tokio::signal;
|
||||
|
||||
use sparse_server::app::*;
|
||||
|
||||
pub async fn serve_web(management_address: SocketAddrV4, _bind_address: SocketAddrV4, db: SqlitePool) -> anyhow::Result<ExitCode> {
|
||||
pub async fn serve_web(management_address: SocketAddrV4, db: SqlitePool) -> anyhow::Result<ExitCode> {
|
||||
let conf = get_configuration(None).unwrap();
|
||||
let leptos_options = conf.leptos_options;
|
||||
let routes = generate_route_list(App);
|
||||
@@ -20,6 +20,8 @@ pub async fn serve_web(management_address: SocketAddrV4, _bind_address: SocketAd
|
||||
.br(true)
|
||||
.zstd(true);
|
||||
|
||||
crate::beacon_handler::start_all_listeners(std::sync::Arc::clone(&beacon_listeners), db.clone()).await?;
|
||||
|
||||
let app = Router::new()
|
||||
.leptos_routes_with_context(
|
||||
&leptos_options,
|
||||
|
||||
Reference in New Issue
Block a user