feat: continuing work on downloading files

This commit is contained in:
Andrew Rioux 2023-09-06 19:44:13 -04:00
parent 50b5c1a921
commit cd23ec1b80
Signed by: andrew.rioux
GPG Key ID: 9B8BAC47C17ABB94
5 changed files with 146 additions and 15 deletions

View File

@ -16,13 +16,13 @@ pub async fn download_file(
remote_file: PathBuf, remote_file: PathBuf,
local_path: PathBuf, local_path: PathBuf,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut file = fs::OpenOptions::new().read(true).open(&local_path).await?; let mut file = fs::OpenOptions::new()
let file_size = file.metadata().await?.len(); .write(true)
.create(true)
.open(&local_path)
.await?;
let command = Command::StartUploadFile( let command = Command::StartDownloadFile(remote_file);
remote_path,
(file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1,
);
conn.send_command(command).await?; conn.send_command(command).await?;
let id = loop { let id = loop {

View File

@ -215,8 +215,11 @@ pub(super) async fn shell(
}), }),
_, _,
) => { ) => {
if let Err(e) = if let Err(e) = commands::download::download_file(
commands::download::download(Arc::clone(&connection), remote_file, local_path) Arc::clone(&connection),
remote_file,
local_path,
)
.await .await
{ {
eprintln!("{e:?}") eprintln!("{e:?}")

View File

@ -74,13 +74,14 @@ pub mod messages {
OpenedTTY(u64), OpenedTTY(u64),
ClosedTTY(u64), ClosedTTY(u64),
SendTTYData(u64, Vec<u64>), SendTTYData(u64, Vec<u8>),
UploadFileID(u64), UploadFileID(u64),
UploadFileStatus(u64, Vec<u64>), UploadFileStatus(u64, Vec<u64>),
StartDownloadFile(u64), StartDownloadFile(u64, u64),
DownloadFileSegment(u64, u64, Vec<u64>), GetDownloadFileStatus(u64, Vec<u64>),
DownloadFileSegment(u64, u64, Vec<u8>),
} }
#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] #[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)]

View File

@ -217,6 +217,7 @@ fn authenticate<F: Fn()>(
} }
mod command; mod command;
mod download_file;
mod upload_file; mod upload_file;
fn handle_full_connection<F>( fn handle_full_connection<F>(
@ -232,7 +233,7 @@ where
let commands = Arc::new(Mutex::new(HashMap::new())); let commands = Arc::new(Mutex::new(HashMap::new()));
let uploaded_files = Arc::new(Mutex::new(HashMap::new())); let uploaded_files = Arc::new(Mutex::new(HashMap::new()));
/*let mut downloaded_files = HashMap::new();*/ let download_files = Arc::new(Mutex::new(HashMap::new()));
std::thread::scope(|s| -> anyhow::Result<()> { std::thread::scope(|s| -> anyhow::Result<()> {
loop { loop {
@ -327,8 +328,36 @@ where
} }
} }
Command::StartDownloadFile(_) => {} Command::StartDownloadFile(path) => {
Command::DownloadFileStatus(_, _) => {} let download_files_clone = download_files.clone();
let Ok(mut lock) = download_files.lock() else {
continue;
};
let handler = match download_file::start_file_download(
&s,
path,
conninfo.clone(),
download_files_clone,
) {
Ok(handler) => handler,
Err(e) => {
eprintln!("error starting file upload: {e:?}");
continue;
}
};
lock.insert(handler.id, handler);
}
Command::DownloadFileStatus(id, needed) => {
let Ok(lock) = download_files.lock() else {
continue;
};
if let Some(handler) = lock.get(&id) {
let _ = handler.download_status.send(needed);
}
}
Command::Disconnect => { Command::Disconnect => {
break; break;

View File

@ -0,0 +1,98 @@
use std::{
collections::HashMap,
fs::{self, OpenOptions},
io::Write,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Sender},
Arc, Mutex,
},
thread::Scope,
time::Duration,
};
use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE};
use super::ConnectionInformation;
static CURRENT_FILE_DOWNLOAD_ID: AtomicU64 = AtomicU64::new(0);
pub(super) struct DownloadFileHandler {
pub id: u64,
pub download_status: Sender<Vec<u64>>,
}
pub(super) fn start_file_download<'a, 'b: 'a>(
s: &'a Scope<'a, 'b>,
file_path: PathBuf,
conninfo: ConnectionInformation,
download_file_map: Arc<Mutex<HashMap<u64, DownloadFileHandler>>>,
) -> anyhow::Result<DownloadFileHandler> {
let (download_status, receive_download_status) = channel();
let id = CURRENT_FILE_DOWNLOAD_ID.fetch_add(1, Ordering::Relaxed);
let id_2 = id;
s.spawn(move || -> anyhow::Result<()> {
let mut file = OpenOptions::new().read(true).open(&file_path)?;
let file_size = file.metadata()?.len();
conninfo.send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile(
id,
(file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1,
))?)?;
let mut current_packet_count = 0;
while current_packet_count < packet_count {
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE];
loop {
let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else {
let up_to = receive_request_status.recv()?;
let needed = buffers[..up_to as usize]
.iter()
.enumerate()
.flat_map(|(i, b)| match b {
Some(..) => None,
None => Some(i as u64),
})
.collect::<Vec<_>>();
let is_empty = needed.is_empty();
conninfo.send(
conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?,
)?;
if is_empty {
current_packet_count += up_to;
break;
} else {
continue;
}
};
buffers[i as usize] = Some(buffer);
}
for buffer in buffers {
let Some(buffer) = buffer else { break };
target_file.write(&buffer)?;
}
}
let Ok(mut lock) = download_file_map.lock() else {
return Ok(());
};
lock.remove(&id);
Ok(())
});
Ok(DownloadFileHandler {
id: id_2,
download_status,
})
}