fix: made uploads and downloads more consistent

This commit is contained in:
Andrew Rioux 2024-09-25 12:33:58 -04:00
parent 1dfd7e0499
commit 4ae9f38812
Signed by: andrew.rioux
GPG Key ID: 9B8BAC47C17ABB94
6 changed files with 103 additions and 59 deletions

View File

@ -30,6 +30,7 @@ pub async fn download_file(
}; };
let mut current_packet_count = 0; let mut current_packet_count = 0;
let mut current_buffer = 0;
while current_packet_count < packet_count { while current_packet_count < packet_count {
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE];
@ -39,7 +40,7 @@ pub async fn download_file(
Ok(Response::DownloadFileSegment(fid, bid, buf)) if fid == id => { Ok(Response::DownloadFileSegment(fid, bid, buf)) if fid == id => {
buffers[bid as usize] = Some(buf); buffers[bid as usize] = Some(buf);
} }
Ok(Response::GetDownloadFileStatus(fid, up_to)) if fid == id => { Ok(Response::GetDownloadFileStatus(fid, bid, up_to)) if fid == id && bid == current_buffer => {
let needed = buffers[..up_to as usize] let needed = buffers[..up_to as usize]
.iter() .iter()
.enumerate() .enumerate()
@ -51,9 +52,10 @@ pub async fn download_file(
let is_empty = needed.is_empty(); let is_empty = needed.is_empty();
conn.send_command(Command::DownloadFileStatus(id, needed)) conn.send_command(Command::DownloadFileStatus(id, bid, needed))
.await?; .await?;
current_buffer += 1;
if is_empty { if is_empty {
current_packet_count += up_to; current_packet_count += up_to;
break; break;

View File

@ -1,6 +1,6 @@
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{fs, io::AsyncReadExt}; use tokio::{fs, io::AsyncReadExt, time::timeout};
use sparse_05_common::messages::{ use sparse_05_common::messages::{
Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE, Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE,
@ -30,6 +30,8 @@ pub async fn upload_file(
} }
}; };
let mut current_buffer = 0;
loop { loop {
let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
@ -52,21 +54,30 @@ pub async fn upload_file(
} }
loop { loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; conn.send_command(Command::GetUploadStatus(id, current_buffer, file_data.len() as u64))
conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64))
.await?; .await?;
let buffers_needed = loop { println!("Requesting status...");
let resp = conn.get_response().await?;
if let Response::UploadFileStatus(iid, buffers) = resp { let buffers_needed = loop {
if id == iid { let resp = timeout(Duration::from_millis(125), conn.get_response()).await;
match resp {
Ok(Ok(Response::UploadFileStatus(iid, bid, buffers))) => {
if id == iid && bid == current_buffer {
break buffers; break buffers;
} }
} }
Ok(..) => {}
Err(..) => {
conn.send_command(Command::GetUploadStatus(id, current_buffer, file_data.len() as u64))
.await?;
}
}
}; };
println!("Sent buffer {current_buffer}; packets needed: {:?}", buffers_needed);
if buffers_needed.len() == 0 { if buffers_needed.len() == 0 {
break; break;
} }
@ -79,8 +90,12 @@ pub async fn upload_file(
); );
conn.send_command(comm).await?; conn.send_command(comm).await?;
} }
println!("Resent packets");
} }
current_buffer += 1;
if done { if done {
break; break;
} }

View File

@ -4,7 +4,7 @@ pub const CONFIG_SEPARATOR: &'static [u8] =
pub mod messages { pub mod messages {
pub const CONNECT_MESSAGE: &'static [u8] = b"CONNECT"; pub const CONNECT_MESSAGE: &'static [u8] = b"CONNECT";
pub const CONNECTED_MESSAGE: &'static [u8] = b"CONNECTED"; pub const CONNECTED_MESSAGE: &'static [u8] = b"CONNECTED";
pub const FILE_TRANSFER_PACKET_SIZE: usize = 1024; pub const FILE_TRANSFER_PACKET_SIZE: usize = 768;
pub const FILE_BUFFER_BUFFER_SIZE: usize = 512; pub const FILE_BUFFER_BUFFER_SIZE: usize = 512;
use std::{ffi::OsString, path::PathBuf}; use std::{ffi::OsString, path::PathBuf};
@ -27,10 +27,10 @@ pub mod messages {
StartUploadFile(PathBuf, u64), StartUploadFile(PathBuf, u64),
SendFileSegment(u64, u64, Vec<u8>), SendFileSegment(u64, u64, Vec<u8>),
GetUploadStatus(u64, u64), GetUploadStatus(u64, u64, u64),
StartDownloadFile(PathBuf), StartDownloadFile(PathBuf),
DownloadFileStatus(u64, Vec<u64>), DownloadFileStatus(u64, u64, Vec<u64>),
Disconnect, Disconnect,
} }
@ -77,10 +77,10 @@ pub mod messages {
SendTTYData(u64, Vec<u8>), SendTTYData(u64, Vec<u8>),
UploadFileID(u64), UploadFileID(u64),
UploadFileStatus(u64, Vec<u64>), UploadFileStatus(u64, u64, Vec<u64>),
StartDownloadFile(u64, u64), StartDownloadFile(u64, u64),
GetDownloadFileStatus(u64, u64), GetDownloadFileStatus(u64, u64, u64),
DownloadFileSegment(u64, u64, Vec<u8>), DownloadFileSegment(u64, u64, Vec<u8>),
} }

View File

@ -380,16 +380,16 @@ where
}; };
if let Some(handler) = lock.get(&id) { if let Some(handler) = lock.get(&id) {
let _ = handler.data_sender.send((number, bytes)); let _ = handler.msg_sender.send(upload_file::UploadFileMsg::Data((number, bytes)));
} }
} }
Command::GetUploadStatus(id, up_to) => { Command::GetUploadStatus(id, bid, up_to) => {
let Ok(lock) = uploaded_files.lock() else { let Ok(lock) = uploaded_files.lock() else {
continue; continue;
}; };
if let Some(handler) = lock.get(&id) { if let Some(handler) = lock.get(&id) {
let _ = handler.request_status.send(up_to); let _ = handler.msg_sender.send(upload_file::UploadFileMsg::ReqStatus(bid, up_to));
} }
} }
@ -414,13 +414,13 @@ where
lock.insert(handler.id, handler); lock.insert(handler.id, handler);
} }
Command::DownloadFileStatus(id, needed) => { Command::DownloadFileStatus(id, bid, needed) => {
let Ok(lock) = download_files.lock() else { let Ok(lock) = download_files.lock() else {
continue; continue;
}; };
if let Some(handler) = lock.get(&id) { if let Some(handler) = lock.get(&id) {
let _ = handler.download_status.send(needed); let _ = handler.download_status.send((bid, needed));
} }
} }

View File

@ -8,7 +8,7 @@ use std::{
mpsc::{channel, Sender}, mpsc::{channel, Sender},
Arc, Mutex, Arc, Mutex,
}, },
thread::Scope, thread::Scope, time::Duration,
}; };
use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE}; use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE};
@ -19,7 +19,7 @@ static CURRENT_FILE_DOWNLOAD_ID: AtomicU64 = AtomicU64::new(0);
pub(super) struct DownloadFileHandler { pub(super) struct DownloadFileHandler {
pub id: u64, pub id: u64,
pub download_status: Sender<Vec<u64>>, pub download_status: Sender<(u64, Vec<u64>)>,
} }
pub(super) fn start_file_download<'a, 'b: 'a>( pub(super) fn start_file_download<'a, 'b: 'a>(
@ -41,6 +41,8 @@ pub(super) fn start_file_download<'a, 'b: 'a>(
conninfo conninfo
.send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile(id, packet_count))?)?; .send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile(id, packet_count))?)?;
let mut current_buffer = 0;
loop { loop {
let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE); let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE]; let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
@ -68,13 +70,24 @@ pub(super) fn start_file_download<'a, 'b: 'a>(
} }
loop { loop {
std::thread::sleep(std::time::Duration::from_millis(1000));
conninfo.send(conninfo.encrypt_and_sign_resp( conninfo.send(conninfo.encrypt_and_sign_resp(
Response::GetDownloadFileStatus(id, file_data.len() as u64), Response::GetDownloadFileStatus(id, current_buffer, file_data.len() as u64),
)?)?; )?)?;
let buffers_needed: Vec<u64> = receive_download_status.recv()?; let buffers_needed: Vec<u64> = loop {
let buffers = receive_download_status.recv_timeout(Duration::from_millis(125));
match buffers {
Ok((bid, buf)) if bid == current_buffer => {
break buf;
}
_ => {
conninfo.send(conninfo.encrypt_and_sign_resp(
Response::GetDownloadFileStatus(id, current_buffer, file_data.len() as u64),
)?)?;
}
}
};
if buffers_needed.is_empty() { if buffers_needed.is_empty() {
break; break;
@ -90,6 +103,8 @@ pub(super) fn start_file_download<'a, 'b: 'a>(
} }
} }
current_buffer += 1;
if done { if done {
break; break;
} }

View File

@ -18,10 +18,14 @@ use super::ConnectionInformation;
static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0); static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0);
pub(super) enum UploadFileMsg {
Data((u64, Vec<u8>)),
ReqStatus(u64, u64)
}
pub(super) struct UploadFileHandler { pub(super) struct UploadFileHandler {
pub id: u64, pub id: u64,
pub data_sender: Sender<(u64, Vec<u8>)>, pub msg_sender: Sender<UploadFileMsg>,
pub request_status: Sender<u64>,
} }
pub(super) fn start_file_upload<'a, 'b: 'a>( pub(super) fn start_file_upload<'a, 'b: 'a>(
@ -31,8 +35,7 @@ pub(super) fn start_file_upload<'a, 'b: 'a>(
conninfo: ConnectionInformation, conninfo: ConnectionInformation,
upload_file_map: Arc<Mutex<HashMap<u64, UploadFileHandler>>>, upload_file_map: Arc<Mutex<HashMap<u64, UploadFileHandler>>>,
) -> anyhow::Result<UploadFileHandler> { ) -> anyhow::Result<UploadFileHandler> {
let (data_sender, data_receiver) = channel(); let (msg_sender, msg_receiver) = channel();
let (request_status, receive_request_status) = channel();
let id = CURRENT_FILE_UPLOAD_ID.fetch_add(1, Ordering::Relaxed); let id = CURRENT_FILE_UPLOAD_ID.fetch_add(1, Ordering::Relaxed);
let id_2 = id; let id_2 = id;
@ -46,14 +49,17 @@ pub(super) fn start_file_upload<'a, 'b: 'a>(
.truncate(true) .truncate(true)
.open(&file_path)?; .open(&file_path)?;
let mut current_packet_count = 0; let mut current_packet_count = 0;
let mut current_buffer = 0;
while current_packet_count < packet_count { while current_packet_count < packet_count {
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE];
loop { loop {
let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else { match msg_receiver.recv() {
let up_to = receive_request_status.recv()?; Ok(UploadFileMsg::Data((i, buffer))) => {
buffers[i as usize] = Some(buffer);
}
Ok(UploadFileMsg::ReqStatus(bid, up_to)) if bid == current_buffer => {
let needed = buffers[..up_to as usize] let needed = buffers[..up_to as usize]
.iter() .iter()
.enumerate() .enumerate()
@ -65,18 +71,25 @@ pub(super) fn start_file_upload<'a, 'b: 'a>(
let is_empty = needed.is_empty(); let is_empty = needed.is_empty();
println!("Status requested {current_buffer}; packets needed: {:?}", needed);
conninfo.send( conninfo.send(
conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?, conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, current_buffer, needed.clone()))?,
)?; )?;
if is_empty { if is_empty {
current_packet_count += up_to; current_packet_count += up_to;
current_buffer += 1;
break; break;
} else { } else {
continue; continue;
} }
}; }
buffers[i as usize] = Some(buffer); Ok(..) => {}
Err(e) => {
eprintln!("Error receiving packet: {e:?}");
}
}
} }
for buffer in buffers { for buffer in buffers {
@ -95,7 +108,6 @@ pub(super) fn start_file_upload<'a, 'b: 'a>(
Ok(UploadFileHandler { Ok(UploadFileHandler {
id: id_2, id: id_2,
data_sender, msg_sender
request_status,
}) })
} }