feat: added ability to download files

This commit is contained in:
Andrew Rioux 2023-09-08 21:25:20 -04:00
parent 239c5ccc40
commit eb5e86067b
Signed by: andrew.rioux
GPG Key ID: 9B8BAC47C17ABB94
4 changed files with 91 additions and 89 deletions

View File

@ -1,13 +1,8 @@
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use tokio::{ use tokio::{fs, io::AsyncWriteExt};
fs,
io::{self, AsyncReadExt},
};
use sparse_05_common::messages::{ use sparse_05_common::messages::{Command, Response, FILE_BUFFER_BUFFER_SIZE};
Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE,
};
use crate::commands::connect::Connection; use crate::commands::connect::Connection;
@ -16,7 +11,7 @@ pub async fn download_file(
remote_file: PathBuf, remote_file: PathBuf,
local_path: PathBuf, local_path: PathBuf,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut file = fs::OpenOptions::new() let mut target_file = fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
.open(&local_path) .open(&local_path)
@ -25,67 +20,54 @@ pub async fn download_file(
let command = Command::StartDownloadFile(remote_file); let command = Command::StartDownloadFile(remote_file);
conn.send_command(command).await?; conn.send_command(command).await?;
let id = loop { let (id, packet_count) = loop {
let resp = conn.get_response().await?; let resp = conn.get_response().await?;
if let Response::UploadFileID(id) = resp { if let Response::StartDownloadFile(id, packet_count) = resp {
break id; break (id, packet_count);
} }
}; };
loop { let mut current_packet_count = 0;
let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
let mut done = false;
for _ in 0..FILE_BUFFER_BUFFER_SIZE { while current_packet_count < packet_count {
let amount = file.read(&mut buffer).await?; let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE];
if amount == 0 {
done = true;
break;
}
file_data.push(buffer[..amount].to_vec());
}
for (i, buffer) in file_data.iter().enumerate() {
let comm = Command::SendFileSegment(id, i as u64, buffer.clone());
conn.send_command(comm).await?;
}
loop { loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; match conn.get_response().await {
Ok(Response::DownloadFileSegment(fid, bid, buf)) if fid == id => {
buffers[bid as usize] = Some(buf);
}
Ok(Response::GetDownloadFileStatus(fid, up_to)) if fid == id => {
let needed = buffers[..up_to as usize]
.iter()
.enumerate()
.flat_map(|(i, b)| match b {
Some(..) => None,
None => Some(i as u64),
})
.collect::<Vec<_>>();
conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64)) let is_empty = needed.is_empty();
conn.send_command(Command::DownloadFileStatus(id, needed))
.await?; .await?;
let buffers_needed = loop { if is_empty {
let resp = conn.get_response().await?; current_packet_count += up_to;
if let Response::UploadFileStatus(iid, buffers) = resp {
if id == iid {
break buffers;
}
}
};
if buffers_needed.len() == 0 {
break; break;
} else {
continue;
} }
}
for buffer_needed in &buffers_needed { Ok(..) => {}
let comm = Command::SendFileSegment( Err(..) => {}
id,
*buffer_needed,
file_data[*buffer_needed as usize].clone(),
);
conn.send_command(comm).await?;
} }
} }
if done { for buffer in buffers {
break; let Some(buffer) = buffer else { break };
target_file.write(&buffer).await?;
} }
} }

View File

@ -3,13 +3,13 @@ use std::{
os::fd::AsRawFd, os::fd::AsRawFd,
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
thread::{self, scope}, thread,
}; };
use sparse_05_common::messages::{Capabilities, Command, Response}; use sparse_05_common::messages::{Capabilities, Command, Response};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{ use tokio::{
io::{stderr, stdout, AsyncReadExt, AsyncWriteExt}, io::{stderr, stdout, AsyncWriteExt},
runtime::Handle, runtime::Handle,
sync::mpsc::{channel, Receiver}, sync::mpsc::{channel, Receiver},
}; };
@ -36,6 +36,10 @@ pub enum SparseCommands {
remote_file: PathBuf, remote_file: PathBuf,
local_path: PathBuf, local_path: PathBuf,
}, },
#[structopt(name = "#edit")]
EditFile {
remote_path: PathBuf,
},
} }
macro_rules! libc_try { macro_rules! libc_try {
@ -225,6 +229,7 @@ pub(super) async fn shell(
eprintln!("{e:?}") eprintln!("{e:?}")
} }
} }
(Ok(SparseCommands::EditFile { remote_path }), _) => {}
_ => { _ => {
if let Err(e) = run_command( if let Err(e) = run_command(
&mut stdin_receiver, &mut stdin_receiver,

View File

@ -80,7 +80,7 @@ pub mod messages {
UploadFileStatus(u64, Vec<u64>), UploadFileStatus(u64, Vec<u64>),
StartDownloadFile(u64, u64), StartDownloadFile(u64, u64),
GetDownloadFileStatus(u64, Vec<u64>), GetDownloadFileStatus(u64, u64),
DownloadFileSegment(u64, u64, Vec<u8>), DownloadFileSegment(u64, u64, Vec<u8>),
} }

View File

@ -1,7 +1,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
fs::{self, OpenOptions}, fs::{self, OpenOptions},
io::Write, io::Read,
path::PathBuf, path::PathBuf,
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
@ -38,50 +38,65 @@ pub(super) fn start_file_download<'a, 'b: 'a>(
let mut file = OpenOptions::new().read(true).open(&file_path)?; let mut file = OpenOptions::new().read(true).open(&file_path)?;
let file_size = file.metadata()?.len(); let file_size = file.metadata()?.len();
conninfo.send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile( let packet_count = (file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1;
id, conninfo
(file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1, .send(conninfo.encrypt_and_sign_resp(Response::StartDownloadFile(id, packet_count))?)?;
))?)?;
let mut current_packet_count = 0; let mut current_packet_count = 0;
/*while current_packet_count < packet_count { loop {
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE]; let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
let mut done = false;
for _ in 0..FILE_BUFFER_BUFFER_SIZE {
let amount = file.read(&mut buffer)?;
if amount == 0 {
done = true;
break;
}
file_data.push(buffer[..amount].to_vec());
}
for (i, buffer) in file_data.iter().enumerate() {
conninfo.send(
conninfo.encrypt_and_sign_resp(Response::DownloadFileSegment(
id,
i as u64,
buffer.clone(),
))?,
)?;
}
loop { loop {
let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else { std::thread::sleep(std::time::Duration::from_millis(1000));
let up_to = receive_request_status.recv()?;
let needed = buffers[..up_to as usize] conninfo.send(conninfo.encrypt_and_sign_resp(
.iter() Response::GetDownloadFileStatus(id, file_data.len() as u64),
.enumerate() )?)?;
.flat_map(|(i, b)| match b {
Some(..) => None,
None => Some(i as u64),
})
.collect::<Vec<_>>();
let is_empty = needed.is_empty(); let buffers_needed: Vec<u64> = receive_download_status.recv()?;
conninfo.send( if buffers_needed.is_empty() {
conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?,
)?;
if is_empty {
current_packet_count += up_to;
break; break;
} else {
continue;
}
};
buffers[i as usize] = Some(buffer);
} }
for buffer in buffers { for buffer_needed in &buffers_needed {
let Some(buffer) = buffer else { break }; let resp = Response::DownloadFileSegment(
target_file.write(&buffer)?; id,
*buffer_needed,
file_data[*buffer_needed as usize].clone(),
);
conninfo.send(conninfo.encrypt_and_sign_resp(resp)?)?;
}
}
if done {
break;
}
} }
}*/
let Ok(mut lock) = download_file_map.lock() else { let Ok(mut lock) = download_file_map.lock() else {
return Ok(()); return Ok(());