From 50b5c1a92176391d5923f2dfbe3ef5256e6b09b8 Mon Sep 17 00:00:00 2001 From: Andrew Rioux Date: Wed, 6 Sep 2023 14:56:21 -0400 Subject: [PATCH] feat: added upload file capability --- Makefile.toml | 2 +- .../src/commands/connect/commands/download.rs | 93 +++++++++++++++++++ .../src/commands/connect/commands/upload.rs | 82 ++++++++++++++++ .../src/commands/connect/shell.rs | 15 +++ sparse-05/sparse-05-common/src/lib.rs | 7 +- .../sparse-05-server/src/capabilities.rs | 5 + sparse-05/sparse-05-server/src/connection.rs | 9 ++ .../src/connection/download_file.rs | 0 .../src/connection/upload_file.rs | 69 +++++++++++++- 9 files changed, 277 insertions(+), 5 deletions(-) create mode 100644 sparse-05/sparse-05-server/src/connection/download_file.rs diff --git a/Makefile.toml b/Makefile.toml index 8aaaf47..b4bb2a1 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -23,7 +23,7 @@ args = ["run", "build", "build", "${@}"] workspace = false script = [ "docker-compose run build build --bin sparse-05-server ${@}", - "docker-compose run build build --bin sparse-05-server --target x86_64-pc-windows-gnu ${@}", +# "docker-compose run build build --bin sparse-05-server --target x86_64-pc-windows-gnu ${@}", "docker-compose run build build --bin sparse-05-client ${@}", ] diff --git a/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs b/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs index e69de29..c1d34e5 100644 --- a/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs +++ b/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs @@ -0,0 +1,93 @@ +use std::{path::PathBuf, sync::Arc}; + +use tokio::{ + fs, + io::{self, AsyncReadExt}, +}; + +use sparse_05_common::messages::{ + Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE, +}; + +use crate::commands::connect::Connection; + +pub async fn download_file( + conn: Arc, + remote_file: PathBuf, + local_path: PathBuf, +) -> anyhow::Result<()> { + let mut file = fs::OpenOptions::new().read(true).open(&local_path).await?; + let file_size = file.metadata().await?.len(); + + let command = Command::StartUploadFile( + remote_path, + (file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1, + ); + conn.send_command(command).await?; + + let id = loop { + let resp = conn.get_response().await?; + + if let Response::UploadFileID(id) = resp { + break id; + } + }; + + loop { + let mut file_data: Vec> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); + let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; + let mut done = false; + + for _ in 0..FILE_BUFFER_BUFFER_SIZE { + let amount = file.read(&mut buffer).await?; + + if amount == 0 { + done = true; + break; + } + + file_data.push(buffer[..amount].to_vec()); + } + + for (i, buffer) in file_data.iter().enumerate() { + let comm = Command::SendFileSegment(id, i as u64, buffer.clone()); + conn.send_command(comm).await?; + } + + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64)) + .await?; + + let buffers_needed = loop { + let resp = conn.get_response().await?; + + if let Response::UploadFileStatus(iid, buffers) = resp { + if id == iid { + break buffers; + } + } + }; + + if buffers_needed.len() == 0 { + break; + } + + for buffer_needed in &buffers_needed { + let comm = Command::SendFileSegment( + id, + *buffer_needed, + file_data[*buffer_needed as usize].clone(), + ); + conn.send_command(comm).await?; + } + } + + if done { + break; + } + } + + Ok(()) +} diff --git a/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs b/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs index 121051b..bf2ced3 100644 --- a/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs +++ b/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs @@ -1,5 +1,14 @@ use std::{path::PathBuf, sync::Arc}; +use tokio::{ + fs, + io::{self, AsyncReadExt}, +}; + +use sparse_05_common::messages::{ + Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE, +}; + use crate::commands::connect::Connection; pub async fn upload_file( @@ -7,5 +16,78 @@ pub async fn upload_file( local_path: PathBuf, remote_path: PathBuf, ) -> anyhow::Result<()> { + let mut file = fs::OpenOptions::new().read(true).open(&local_path).await?; + let file_size = file.metadata().await?.len(); + + let command = Command::StartUploadFile( + remote_path, + (file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1, + ); + conn.send_command(command).await?; + + let id = loop { + let resp = conn.get_response().await?; + + if let Response::UploadFileID(id) = resp { + break id; + } + }; + + loop { + let mut file_data: Vec> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); + let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; + let mut done = false; + + for _ in 0..FILE_BUFFER_BUFFER_SIZE { + let amount = file.read(&mut buffer).await?; + + if amount == 0 { + done = true; + break; + } + + file_data.push(buffer[..amount].to_vec()); + } + + for (i, buffer) in file_data.iter().enumerate() { + let comm = Command::SendFileSegment(id, i as u64, buffer.clone()); + conn.send_command(comm).await?; + } + + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64)) + .await?; + + let buffers_needed = loop { + let resp = conn.get_response().await?; + + if let Response::UploadFileStatus(iid, buffers) = resp { + if id == iid { + break buffers; + } + } + }; + + if buffers_needed.len() == 0 { + break; + } + + for buffer_needed in &buffers_needed { + let comm = Command::SendFileSegment( + id, + *buffer_needed, + file_data[*buffer_needed as usize].clone(), + ); + conn.send_command(comm).await?; + } + } + + if done { + break; + } + } + Ok(()) } diff --git a/sparse-05/sparse-05-client/src/commands/connect/shell.rs b/sparse-05/sparse-05-client/src/commands/connect/shell.rs index 7519b79..2d4db8c 100644 --- a/sparse-05/sparse-05-client/src/commands/connect/shell.rs +++ b/sparse-05/sparse-05-client/src/commands/connect/shell.rs @@ -171,6 +171,7 @@ pub(super) async fn shell( let Ok(input) = std::str::from_utf8(&cmd) else { continue; }; + let input = input.trim(); let (args, help) = if input.starts_with("#help") { (input.split(" ").collect::>(), true) @@ -207,6 +208,20 @@ pub(super) async fn shell( eprintln!("{e:?}") } } + ( + Ok(SparseCommands::DownloadFile { + local_path, + remote_file, + }), + _, + ) => { + if let Err(e) = + commands::download::download(Arc::clone(&connection), remote_file, local_path) + .await + { + eprintln!("{e:?}") + } + } _ => { if let Err(e) = run_command( &mut stdin_receiver, diff --git a/sparse-05/sparse-05-common/src/lib.rs b/sparse-05/sparse-05-common/src/lib.rs index 347becc..6fcdee4 100644 --- a/sparse-05/sparse-05-common/src/lib.rs +++ b/sparse-05/sparse-05-common/src/lib.rs @@ -4,6 +4,8 @@ pub const CONFIG_SEPARATOR: &'static [u8] = pub mod messages { pub const CONNECT_MESSAGE: &'static [u8] = b"CONNECT"; pub const CONNECTED_MESSAGE: &'static [u8] = b"CONNECTED"; + pub const FILE_TRANSFER_PACKET_SIZE: usize = 1024; + pub const FILE_BUFFER_BUFFER_SIZE: usize = 512; use std::{ffi::OsString, path::PathBuf}; @@ -25,9 +27,10 @@ pub mod messages { StartUploadFile(PathBuf, u64), SendFileSegment(u64, u64, Vec), + GetUploadStatus(u64, u64), StartDownloadFile(PathBuf), - DownloadFileStatus(u64, Result<(), Vec>), + DownloadFileStatus(u64, Vec), Disconnect, } @@ -74,7 +77,7 @@ pub mod messages { SendTTYData(u64, Vec), UploadFileID(u64), - UploadFileStatus(u64, Result<(), Vec>), + UploadFileStatus(u64, Vec), StartDownloadFile(u64), DownloadFileSegment(u64, u64, Vec), diff --git a/sparse-05/sparse-05-server/src/capabilities.rs b/sparse-05/sparse-05-server/src/capabilities.rs index 73da9ef..0711978 100644 --- a/sparse-05/sparse-05-server/src/capabilities.rs +++ b/sparse-05/sparse-05-server/src/capabilities.rs @@ -2,10 +2,14 @@ use std::ffi::c_int; use sparse_05_common::messages::{Capabilities, OperatingSystem, TransportType}; +#[cfg(target_os = "linux")] const CAP_SETUID: u32 = 1 << 7; +#[cfg(target_os = "linux")] const CAP_NET_RAW: u32 = 1 << 13; +#[cfg(target_os = "linux")] const SYS_CAPGET: i64 = 125; +#[cfg(target_os = "linux")] #[allow(non_camel_case_types)] #[repr(C)] #[derive(Debug)] @@ -14,6 +18,7 @@ struct cap_user_header_t { pid: c_int, } +#[cfg(target_os = "linux")] #[allow(non_camel_case_types)] #[repr(C)] #[derive(Debug)] diff --git a/sparse-05/sparse-05-server/src/connection.rs b/sparse-05/sparse-05-server/src/connection.rs index c38b11d..733ddd5 100644 --- a/sparse-05/sparse-05-server/src/connection.rs +++ b/sparse-05/sparse-05-server/src/connection.rs @@ -317,6 +317,15 @@ where let _ = handler.data_sender.send((number, bytes)); } } + Command::GetUploadStatus(id, up_to) => { + let Ok(lock) = uploaded_files.lock() else { + continue; + }; + + if let Some(handler) = lock.get(&id) { + let _ = handler.request_status.send(up_to); + } + } Command::StartDownloadFile(_) => {} Command::DownloadFileStatus(_, _) => {} diff --git a/sparse-05/sparse-05-server/src/connection/download_file.rs b/sparse-05/sparse-05-server/src/connection/download_file.rs new file mode 100644 index 0000000..e69de29 diff --git a/sparse-05/sparse-05-server/src/connection/upload_file.rs b/sparse-05/sparse-05-server/src/connection/upload_file.rs index 28af231..a08d8fc 100644 --- a/sparse-05/sparse-05-server/src/connection/upload_file.rs +++ b/sparse-05/sparse-05-server/src/connection/upload_file.rs @@ -1,5 +1,7 @@ use std::{ collections::HashMap, + fs::{self, OpenOptions}, + io::Write, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering}, @@ -7,8 +9,11 @@ use std::{ Arc, Mutex, }, thread::Scope, + time::Duration, }; +use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE}; + use super::ConnectionInformation; static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0); @@ -16,6 +21,7 @@ static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0); pub(super) struct UploadFileHandler { pub id: u64, pub data_sender: Sender<(u64, Vec)>, + pub request_status: Sender, } pub(super) fn start_file_upload<'a, 'b: 'a>( @@ -26,10 +32,69 @@ pub(super) fn start_file_upload<'a, 'b: 'a>( upload_file_map: Arc>>, ) -> anyhow::Result { let (data_sender, data_receiver) = channel(); + let (request_status, receive_request_status) = channel(); let id = CURRENT_FILE_UPLOAD_ID.fetch_add(1, Ordering::Relaxed); + let id_2 = id; - let buffer: Vec> = Vec::with_capacity(packet_count as usize); + s.spawn(move || -> anyhow::Result<()> { + conninfo.send(conninfo.encrypt_and_sign_resp(Response::UploadFileID(id))?)?; - Ok(UploadFileHandler { id, data_sender }) + let mut target_file = OpenOptions::new() + .write(true) + .create(true) + .open(&file_path)?; + let mut current_packet_count = 0; + + while current_packet_count < packet_count { + let mut buffers: Vec>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; + + loop { + let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else { + let up_to = receive_request_status.recv()?; + + let needed = buffers[..up_to as usize] + .iter() + .enumerate() + .flat_map(|(i, b)| match b { + Some(..) => None, + None => Some(i as u64), + }) + .collect::>(); + + let is_empty = needed.is_empty(); + + conninfo.send( + conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?, + )?; + + if is_empty { + current_packet_count += up_to; + break; + } else { + continue; + } + }; + buffers[i as usize] = Some(buffer); + } + + for buffer in buffers { + let Some(buffer) = buffer else { break }; + target_file.write(&buffer)?; + } + } + + let Ok(mut lock) = upload_file_map.lock() else { + return Ok(()); + }; + lock.remove(&id); + + Ok(()) + }); + + Ok(UploadFileHandler { + id: id_2, + data_sender, + request_status, + }) }