diff --git a/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs b/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs index 3654dc9..e10047d 100644 --- a/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs +++ b/sparse-05/sparse-05-client/src/commands/connect/commands/download.rs @@ -30,6 +30,7 @@ pub async fn download_file( }; let mut current_packet_count = 0; + let mut current_buffer = 0; while current_packet_count < packet_count { let mut buffers: Vec>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; @@ -39,7 +40,7 @@ pub async fn download_file( Ok(Response::DownloadFileSegment(fid, bid, buf)) if fid == id => { buffers[bid as usize] = Some(buf); } - Ok(Response::GetDownloadFileStatus(fid, up_to)) if fid == id => { + Ok(Response::GetDownloadFileStatus(fid, bid, up_to)) if fid == id && bid == current_buffer => { let needed = buffers[..up_to as usize] .iter() .enumerate() @@ -51,9 +52,10 @@ pub async fn download_file( let is_empty = needed.is_empty(); - conn.send_command(Command::DownloadFileStatus(id, needed)) + conn.send_command(Command::DownloadFileStatus(id, bid, needed)) .await?; + current_buffer += 1; if is_empty { current_packet_count += up_to; break; diff --git a/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs b/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs index c934002..58207d3 100644 --- a/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs +++ b/sparse-05/sparse-05-client/src/commands/connect/commands/upload.rs @@ -1,6 +1,6 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::{fs, io::AsyncReadExt}; +use tokio::{fs, io::AsyncReadExt, time::timeout}; use sparse_05_common::messages::{ Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE, @@ -30,6 +30,8 @@ pub async fn upload_file( } }; + let mut current_buffer = 0; + loop { let mut file_data: Vec> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; @@ -52,21 +54,30 @@ pub async fn upload_file( } loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64)) + conn.send_command(Command::GetUploadStatus(id, current_buffer, file_data.len() as u64)) .await?; - let buffers_needed = loop { - let resp = conn.get_response().await?; + println!("Requesting status..."); - if let Response::UploadFileStatus(iid, buffers) = resp { - if id == iid { - break buffers; + let buffers_needed = loop { + let resp = timeout(Duration::from_millis(125), conn.get_response()).await; + + match resp { + Ok(Ok(Response::UploadFileStatus(iid, bid, buffers))) => { + if id == iid && bid == current_buffer { + break buffers; + } + } + Ok(..) => {} + Err(..) => { + conn.send_command(Command::GetUploadStatus(id, current_buffer, file_data.len() as u64)) + .await?; } } }; + println!("Sent buffer {current_buffer}; packets needed: {:?}", buffers_needed); + if buffers_needed.len() == 0 { break; } @@ -79,8 +90,12 @@ pub async fn upload_file( ); conn.send_command(comm).await?; } + + println!("Resent packets"); } + current_buffer += 1; + if done { break; } diff --git a/sparse-05/sparse-05-common/src/lib.rs b/sparse-05/sparse-05-common/src/lib.rs index df2df6b..79510b4 100644 --- a/sparse-05/sparse-05-common/src/lib.rs +++ b/sparse-05/sparse-05-common/src/lib.rs @@ -4,7 +4,7 @@ pub const CONFIG_SEPARATOR: &'static [u8] = pub mod messages { pub const CONNECT_MESSAGE: &'static [u8] = b"CONNECT"; pub const CONNECTED_MESSAGE: &'static [u8] = b"CONNECTED"; - pub const FILE_TRANSFER_PACKET_SIZE: usize = 1024; + pub const FILE_TRANSFER_PACKET_SIZE: usize = 768; pub const FILE_BUFFER_BUFFER_SIZE: usize = 512; use std::{ffi::OsString, path::PathBuf}; @@ -27,10 +27,10 @@ pub mod messages { StartUploadFile(PathBuf, u64), SendFileSegment(u64, u64, Vec), - GetUploadStatus(u64, u64), + GetUploadStatus(u64, u64, u64), StartDownloadFile(PathBuf), - DownloadFileStatus(u64, Vec), + DownloadFileStatus(u64, u64, Vec), Disconnect, } @@ -77,10 +77,10 @@ pub mod messages { SendTTYData(u64, Vec), UploadFileID(u64), - UploadFileStatus(u64, Vec), + UploadFileStatus(u64, u64, Vec), StartDownloadFile(u64, u64), - GetDownloadFileStatus(u64, u64), + GetDownloadFileStatus(u64, u64, u64), DownloadFileSegment(u64, u64, Vec), } diff --git a/sparse-05/sparse-05-server/src/connection.rs b/sparse-05/sparse-05-server/src/connection.rs index 966507e..6596f25 100644 --- a/sparse-05/sparse-05-server/src/connection.rs +++ b/sparse-05/sparse-05-server/src/connection.rs @@ -380,16 +380,16 @@ where }; if let Some(handler) = lock.get(&id) { - let _ = handler.data_sender.send((number, bytes)); + let _ = handler.msg_sender.send(upload_file::UploadFileMsg::Data((number, bytes))); } } - Command::GetUploadStatus(id, up_to) => { + Command::GetUploadStatus(id, bid, up_to) => { let Ok(lock) = uploaded_files.lock() else { continue; }; if let Some(handler) = lock.get(&id) { - let _ = handler.request_status.send(up_to); + let _ = handler.msg_sender.send(upload_file::UploadFileMsg::ReqStatus(bid, up_to)); } } @@ -414,13 +414,13 @@ where lock.insert(handler.id, handler); } - Command::DownloadFileStatus(id, needed) => { + Command::DownloadFileStatus(id, bid, needed) => { let Ok(lock) = download_files.lock() else { continue; }; if let Some(handler) = lock.get(&id) { - let _ = handler.download_status.send(needed); + let _ = handler.download_status.send((bid, needed)); } } diff --git a/sparse-05/sparse-05-server/src/connection/download_file.rs b/sparse-05/sparse-05-server/src/connection/download_file.rs index dd7ce6c..7781cf0 100644 --- a/sparse-05/sparse-05-server/src/connection/download_file.rs +++ b/sparse-05/sparse-05-server/src/connection/download_file.rs @@ -8,7 +8,7 @@ use std::{ mpsc::{channel, Sender}, Arc, Mutex, }, - thread::Scope, + thread::Scope, time::Duration, }; use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE}; @@ -19,7 +19,7 @@ static CURRENT_FILE_DOWNLOAD_ID: AtomicU64 = AtomicU64::new(0); pub(super) struct DownloadFileHandler { pub id: u64, - pub download_status: Sender>, + pub download_status: Sender<(u64, Vec)>, } pub(super) fn start_file_download<'a, 'b: 'a>( @@ -41,6 +41,8 @@ pub(super) fn start_file_download<'a, 'b: 'a>( conninfo .send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile(id, packet_count))?)?; + let mut current_buffer = 0; + loop { let mut file_data: Vec> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; @@ -68,13 +70,24 @@ pub(super) fn start_file_download<'a, 'b: 'a>( } loop { - std::thread::sleep(std::time::Duration::from_millis(1000)); - conninfo.send(conninfo.encrypt_and_sign_resp( - Response::GetDownloadFileStatus(id, file_data.len() as u64), + Response::GetDownloadFileStatus(id, current_buffer, file_data.len() as u64), )?)?; - let buffers_needed: Vec = receive_download_status.recv()?; + let buffers_needed: Vec = loop { + let buffers = receive_download_status.recv_timeout(Duration::from_millis(125)); + + match buffers { + Ok((bid, buf)) if bid == current_buffer => { + break buf; + } + _ => { + conninfo.send(conninfo.encrypt_and_sign_resp( + Response::GetDownloadFileStatus(id, current_buffer, file_data.len() as u64), + )?)?; + } + } + }; if buffers_needed.is_empty() { break; @@ -90,6 +103,8 @@ pub(super) fn start_file_download<'a, 'b: 'a>( } } + current_buffer += 1; + if done { break; } diff --git a/sparse-05/sparse-05-server/src/connection/upload_file.rs b/sparse-05/sparse-05-server/src/connection/upload_file.rs index 1406754..42da16b 100644 --- a/sparse-05/sparse-05-server/src/connection/upload_file.rs +++ b/sparse-05/sparse-05-server/src/connection/upload_file.rs @@ -18,10 +18,14 @@ use super::ConnectionInformation; static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0); +pub(super) enum UploadFileMsg { + Data((u64, Vec)), + ReqStatus(u64, u64) +} + pub(super) struct UploadFileHandler { pub id: u64, - pub data_sender: Sender<(u64, Vec)>, - pub request_status: Sender, + pub msg_sender: Sender, } pub(super) fn start_file_upload<'a, 'b: 'a>( @@ -31,8 +35,7 @@ pub(super) fn start_file_upload<'a, 'b: 'a>( conninfo: ConnectionInformation, upload_file_map: Arc>>, ) -> anyhow::Result { - let (data_sender, data_receiver) = channel(); - let (request_status, receive_request_status) = channel(); + let (msg_sender, msg_receiver) = channel(); let id = CURRENT_FILE_UPLOAD_ID.fetch_add(1, Ordering::Relaxed); let id_2 = id; @@ -46,37 +49,47 @@ pub(super) fn start_file_upload<'a, 'b: 'a>( .truncate(true) .open(&file_path)?; let mut current_packet_count = 0; + let mut current_buffer = 0; while current_packet_count < packet_count { let mut buffers: Vec>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; loop { - let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else { - let up_to = receive_request_status.recv()?; - - let needed = buffers[..up_to as usize] - .iter() - .enumerate() - .flat_map(|(i, b)| match b { - Some(..) => None, - None => Some(i as u64), - }) - .collect::>(); - - let is_empty = needed.is_empty(); - - conninfo.send( - conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?, - )?; - - if is_empty { - current_packet_count += up_to; - break; - } else { - continue; + match msg_receiver.recv() { + Ok(UploadFileMsg::Data((i, buffer))) => { + buffers[i as usize] = Some(buffer); } - }; - buffers[i as usize] = Some(buffer); + Ok(UploadFileMsg::ReqStatus(bid, up_to)) if bid == current_buffer => { + let needed = buffers[..up_to as usize] + .iter() + .enumerate() + .flat_map(|(i, b)| match b { + Some(..) => None, + None => Some(i as u64), + }) + .collect::>(); + + let is_empty = needed.is_empty(); + + println!("Status requested {current_buffer}; packets needed: {:?}", needed); + + conninfo.send( + conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, current_buffer, needed.clone()))?, + )?; + + if is_empty { + current_packet_count += up_to; + current_buffer += 1; + break; + } else { + continue; + } + } + Ok(..) => {} + Err(e) => { + eprintln!("Error receiving packet: {e:?}"); + } + } } for buffer in buffers { @@ -95,7 +108,6 @@ pub(super) fn start_file_upload<'a, 'b: 'a>( Ok(UploadFileHandler { id: id_2, - data_sender, - request_status, + msg_sender }) }