feat: added upload file capability

This commit is contained in:
Andrew Rioux 2023-09-06 14:56:21 -04:00
parent 231108f2c3
commit 50b5c1a921
Signed by: andrew.rioux
GPG Key ID: 9B8BAC47C17ABB94
9 changed files with 277 additions and 5 deletions

View File

@ -23,7 +23,7 @@ args = ["run", "build", "build", "${@}"]
workspace = false
script = [
"docker-compose run build build --bin sparse-05-server ${@}",
"docker-compose run build build --bin sparse-05-server --target x86_64-pc-windows-gnu ${@}",
# "docker-compose run build build --bin sparse-05-server --target x86_64-pc-windows-gnu ${@}",
"docker-compose run build build --bin sparse-05-client ${@}",
]

View File

@ -0,0 +1,93 @@
use std::{path::PathBuf, sync::Arc};
use tokio::{
fs,
io::{self, AsyncReadExt},
};
use sparse_05_common::messages::{
Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE,
};
use crate::commands::connect::Connection;
pub async fn download_file(
conn: Arc<Connection>,
remote_file: PathBuf,
local_path: PathBuf,
) -> anyhow::Result<()> {
let mut file = fs::OpenOptions::new().read(true).open(&local_path).await?;
let file_size = file.metadata().await?.len();
let command = Command::StartUploadFile(
remote_path,
(file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1,
);
conn.send_command(command).await?;
let id = loop {
let resp = conn.get_response().await?;
if let Response::UploadFileID(id) = resp {
break id;
}
};
loop {
let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
let mut done = false;
for _ in 0..FILE_BUFFER_BUFFER_SIZE {
let amount = file.read(&mut buffer).await?;
if amount == 0 {
done = true;
break;
}
file_data.push(buffer[..amount].to_vec());
}
for (i, buffer) in file_data.iter().enumerate() {
let comm = Command::SendFileSegment(id, i as u64, buffer.clone());
conn.send_command(comm).await?;
}
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64))
.await?;
let buffers_needed = loop {
let resp = conn.get_response().await?;
if let Response::UploadFileStatus(iid, buffers) = resp {
if id == iid {
break buffers;
}
}
};
if buffers_needed.len() == 0 {
break;
}
for buffer_needed in &buffers_needed {
let comm = Command::SendFileSegment(
id,
*buffer_needed,
file_data[*buffer_needed as usize].clone(),
);
conn.send_command(comm).await?;
}
}
if done {
break;
}
}
Ok(())
}

View File

@ -1,5 +1,14 @@
use std::{path::PathBuf, sync::Arc};
use tokio::{
fs,
io::{self, AsyncReadExt},
};
use sparse_05_common::messages::{
Command, Response, FILE_BUFFER_BUFFER_SIZE, FILE_TRANSFER_PACKET_SIZE,
};
use crate::commands::connect::Connection;
pub async fn upload_file(
@ -7,5 +16,78 @@ pub async fn upload_file(
local_path: PathBuf,
remote_path: PathBuf,
) -> anyhow::Result<()> {
let mut file = fs::OpenOptions::new().read(true).open(&local_path).await?;
let file_size = file.metadata().await?.len();
let command = Command::StartUploadFile(
remote_path,
(file_size / FILE_TRANSFER_PACKET_SIZE as u64) + 1,
);
conn.send_command(command).await?;
let id = loop {
let resp = conn.get_response().await?;
if let Response::UploadFileID(id) = resp {
break id;
}
};
loop {
let mut file_data: Vec<Vec<u8>> = Vec::with_capacity(FILE_BUFFER_BUFFER_SIZE);
let mut buffer = [0u8; FILE_TRANSFER_PACKET_SIZE];
let mut done = false;
for _ in 0..FILE_BUFFER_BUFFER_SIZE {
let amount = file.read(&mut buffer).await?;
if amount == 0 {
done = true;
break;
}
file_data.push(buffer[..amount].to_vec());
}
for (i, buffer) in file_data.iter().enumerate() {
let comm = Command::SendFileSegment(id, i as u64, buffer.clone());
conn.send_command(comm).await?;
}
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
conn.send_command(Command::GetUploadStatus(id, file_data.len() as u64))
.await?;
let buffers_needed = loop {
let resp = conn.get_response().await?;
if let Response::UploadFileStatus(iid, buffers) = resp {
if id == iid {
break buffers;
}
}
};
if buffers_needed.len() == 0 {
break;
}
for buffer_needed in &buffers_needed {
let comm = Command::SendFileSegment(
id,
*buffer_needed,
file_data[*buffer_needed as usize].clone(),
);
conn.send_command(comm).await?;
}
}
if done {
break;
}
}
Ok(())
}

View File

@ -171,6 +171,7 @@ pub(super) async fn shell(
let Ok(input) = std::str::from_utf8(&cmd) else {
continue;
};
let input = input.trim();
let (args, help) = if input.starts_with("#help") {
(input.split(" ").collect::<Vec<_>>(), true)
@ -207,6 +208,20 @@ pub(super) async fn shell(
eprintln!("{e:?}")
}
}
(
Ok(SparseCommands::DownloadFile {
local_path,
remote_file,
}),
_,
) => {
if let Err(e) =
commands::download::download(Arc::clone(&connection), remote_file, local_path)
.await
{
eprintln!("{e:?}")
}
}
_ => {
if let Err(e) = run_command(
&mut stdin_receiver,

View File

@ -4,6 +4,8 @@ pub const CONFIG_SEPARATOR: &'static [u8] =
pub mod messages {
pub const CONNECT_MESSAGE: &'static [u8] = b"CONNECT";
pub const CONNECTED_MESSAGE: &'static [u8] = b"CONNECTED";
pub const FILE_TRANSFER_PACKET_SIZE: usize = 1024;
pub const FILE_BUFFER_BUFFER_SIZE: usize = 512;
use std::{ffi::OsString, path::PathBuf};
@ -25,9 +27,10 @@ pub mod messages {
StartUploadFile(PathBuf, u64),
SendFileSegment(u64, u64, Vec<u8>),
GetUploadStatus(u64, u64),
StartDownloadFile(PathBuf),
DownloadFileStatus(u64, Result<(), Vec<u64>>),
DownloadFileStatus(u64, Vec<u64>),
Disconnect,
}
@ -74,7 +77,7 @@ pub mod messages {
SendTTYData(u64, Vec<u64>),
UploadFileID(u64),
UploadFileStatus(u64, Result<(), Vec<u64>>),
UploadFileStatus(u64, Vec<u64>),
StartDownloadFile(u64),
DownloadFileSegment(u64, u64, Vec<u64>),

View File

@ -2,10 +2,14 @@ use std::ffi::c_int;
use sparse_05_common::messages::{Capabilities, OperatingSystem, TransportType};
#[cfg(target_os = "linux")]
const CAP_SETUID: u32 = 1 << 7;
#[cfg(target_os = "linux")]
const CAP_NET_RAW: u32 = 1 << 13;
#[cfg(target_os = "linux")]
const SYS_CAPGET: i64 = 125;
#[cfg(target_os = "linux")]
#[allow(non_camel_case_types)]
#[repr(C)]
#[derive(Debug)]
@ -14,6 +18,7 @@ struct cap_user_header_t {
pid: c_int,
}
#[cfg(target_os = "linux")]
#[allow(non_camel_case_types)]
#[repr(C)]
#[derive(Debug)]

View File

@ -317,6 +317,15 @@ where
let _ = handler.data_sender.send((number, bytes));
}
}
Command::GetUploadStatus(id, up_to) => {
let Ok(lock) = uploaded_files.lock() else {
continue;
};
if let Some(handler) = lock.get(&id) {
let _ = handler.request_status.send(up_to);
}
}
Command::StartDownloadFile(_) => {}
Command::DownloadFileStatus(_, _) => {}

View File

@ -1,5 +1,7 @@
use std::{
collections::HashMap,
fs::{self, OpenOptions},
io::Write,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
@ -7,8 +9,11 @@ use std::{
Arc, Mutex,
},
thread::Scope,
time::Duration,
};
use sparse_05_common::messages::{Response, FILE_BUFFER_BUFFER_SIZE};
use super::ConnectionInformation;
static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0);
@ -16,6 +21,7 @@ static CURRENT_FILE_UPLOAD_ID: AtomicU64 = AtomicU64::new(0);
pub(super) struct UploadFileHandler {
pub id: u64,
pub data_sender: Sender<(u64, Vec<u8>)>,
pub request_status: Sender<u64>,
}
pub(super) fn start_file_upload<'a, 'b: 'a>(
@ -26,10 +32,69 @@ pub(super) fn start_file_upload<'a, 'b: 'a>(
upload_file_map: Arc<Mutex<HashMap<u64, UploadFileHandler>>>,
) -> anyhow::Result<UploadFileHandler> {
let (data_sender, data_receiver) = channel();
let (request_status, receive_request_status) = channel();
let id = CURRENT_FILE_UPLOAD_ID.fetch_add(1, Ordering::Relaxed);
let id_2 = id;
let buffer: Vec<Vec<u8>> = Vec::with_capacity(packet_count as usize);
s.spawn(move || -> anyhow::Result<()> {
conninfo.send(conninfo.encrypt_and_sign_resp(Response::UploadFileID(id))?)?;
Ok(UploadFileHandler { id, data_sender })
let mut target_file = OpenOptions::new()
.write(true)
.create(true)
.open(&file_path)?;
let mut current_packet_count = 0;
while current_packet_count < packet_count {
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; FILE_BUFFER_BUFFER_SIZE];
loop {
let Ok((i, buffer)) = data_receiver.recv_timeout(Duration::from_millis(250)) else {
let up_to = receive_request_status.recv()?;
let needed = buffers[..up_to as usize]
.iter()
.enumerate()
.flat_map(|(i, b)| match b {
Some(..) => None,
None => Some(i as u64),
})
.collect::<Vec<_>>();
let is_empty = needed.is_empty();
conninfo.send(
conninfo.encrypt_and_sign_resp(Response::UploadFileStatus(id, needed))?,
)?;
if is_empty {
current_packet_count += up_to;
break;
} else {
continue;
}
};
buffers[i as usize] = Some(buffer);
}
for buffer in buffers {
let Some(buffer) = buffer else { break };
target_file.write(&buffer)?;
}
}
let Ok(mut lock) = upload_file_map.lock() else {
return Ok(());
};
lock.remove(&id);
Ok(())
});
Ok(UploadFileHandler {
id: id_2,
data_sender,
request_status,
})
}