diff --git a/.devcontainer/Dockerfile.alpine b/.devcontainer/Dockerfile.alpine index 729edd5..fd373f8 100644 --- a/.devcontainer/Dockerfile.alpine +++ b/.devcontainer/Dockerfile.alpine @@ -13,7 +13,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM rust:1-alpine +FROM rust:1.73-alpine RUN apk add cmake make automake musl-dev autoconf libtool libcap \ flex bison linux-headers openssl-dev lldb build-base libcap-dev diff --git a/Cargo.lock b/Cargo.lock index e438c5e..4731eae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,15 +86,6 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "atomic-polyfill" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28" -dependencies = [ - "critical-section", -] - [[package]] name = "atty" version = "0.2.14" @@ -247,10 +238,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" [[package]] -name = "critical-section" -version = "1.1.2" +name = "crossbeam-utils" +version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] [[package]] name = "crypto-mac" @@ -284,38 +278,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "defmt" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2d011b2fee29fb7d659b83c43fce9a2cb4df453e16d441a51448e448f3f98" -dependencies = [ - "bitflags 1.3.2", - "defmt-macros", -] - -[[package]] -name = "defmt-macros" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54f0216f6c5acb5ae1a47050a6645024e6edafc2ee32d421955eccfef12ef92e" -dependencies = [ - "defmt-parser", - "proc-macro-error", - "proc-macro2 1.0.66", - "quote 1.0.33", - "syn 2.0.29", -] - -[[package]] -name = "defmt-parser" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "269924c02afd7f94bc4cecbfa5c379f6ffcf9766b3408fe63d22c728654eccd0" -dependencies = [ - "thiserror", -] - [[package]] name = "derive_more" version = "0.14.1" @@ -324,7 +286,7 @@ checksum = "6d944ac6003ed268757ef1ee686753b57efc5fcf0ebe7b64c9fc81e7e32ff839" dependencies = [ "proc-macro2 0.4.30", "quote 0.6.13", - "rustc_version 0.2.3", + "rustc_version", "syn 0.15.44", ] @@ -608,28 +570,6 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" -[[package]] -name = "hash32" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" -dependencies = [ - "byteorder", -] - -[[package]] -name = "heapless" -version = "0.7.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" -dependencies = [ - "atomic-polyfill", - "hash32", - "rustc_version 0.4.0", - "spin", - "stable_deref_trait", -] - [[package]] name = "heck" version = "0.3.3" @@ -722,12 +662,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" - -[[package]] -name = "managed" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" +dependencies = [ + "value-bag", +] [[package]] name = "memchr" @@ -1060,6 +997,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ringbuf" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "rmp" version = "0.8.12" @@ -1094,16 +1040,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" dependencies = [ - "semver 0.9.0", -] - -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver 1.0.18", + "semver", ] [[package]] @@ -1134,12 +1071,6 @@ dependencies = [ "semver-parser", ] -[[package]] -name = "semver" -version = "1.0.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" - [[package]] name = "semver-parser" version = "0.7.0" @@ -1241,22 +1172,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" -[[package]] -name = "smoltcp" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2e3a36ac8fea7b94e666dfa3871063d6e0a5c9d5d4fec9a1a6b7b6760f0229" -dependencies = [ - "bitflags 1.3.2", - "byteorder", - "cfg-if", - "defmt", - "heapless", - "libc", - "log", - "managed", -] - [[package]] name = "socket2" version = "0.5.3" @@ -1324,21 +1239,6 @@ dependencies = [ "pcap-sys", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] - -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "strsim" version = "0.8.0" @@ -1420,8 +1320,8 @@ dependencies = [ "packets", "pcap-sys", "rand 0.8.5", + "ringbuf", "simple_logger", - "smoltcp", "tokio", "tokio-stream", ] @@ -1592,6 +1492,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "value-bag" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Makefile.toml b/Makefile.toml index 4fc2f0c..76df7c2 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -82,6 +82,13 @@ command = "convco" args = ["commit", "-i"] dependencies = ["git-pre-commit", "git-add"] +[tasks.git-share-root] +workspace = false +script = [ + "git config --global --add safe.directory /workspaces/sparse/nl-sys/libnl", + "git config --global --add safe.directory /workspaces/sparse/pcap-sys/libpcap", +] + #--------------------------------- # # Project setup tasks @@ -90,7 +97,7 @@ dependencies = ["git-pre-commit", "git-add"] [tasks.setup] workspace = false -dependencies = ["setup-pull-rust-image", "setup-update-submodules"] +dependencies = ["git-share-root", "setup-pull-rust-image", "setup-update-submodules"] [tasks.setup-pull-rust-image] workspace = false diff --git a/examples/bind-shell/backdoor/src/main.rs b/examples/bind-shell/backdoor/src/main.rs index c857e38..3e84f31 100644 --- a/examples/bind-shell/backdoor/src/main.rs +++ b/examples/bind-shell/backdoor/src/main.rs @@ -138,7 +138,9 @@ async fn handle_command( ) -> anyhow::Result<()> { use pcap_sys::packets::*; let eth_pkt = eth.pkt(); - let Layer3Pkt::IPv4Pkt(ip_pkt) = eth_pkt.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = eth_pkt.get_layer3_pkt()? else { + return Ok(()); + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; diff --git a/packets/src/lib.rs b/packets/src/lib.rs index c6ba82a..ba4f3a2 100644 --- a/packets/src/lib.rs +++ b/packets/src/lib.rs @@ -628,7 +628,7 @@ impl TCPPacket { } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct TCPPacketBuilder { srcport: Option, dstport: Option, @@ -762,7 +762,7 @@ impl TCPPacketBuilder { let checksum = (checksum >> 16) + (checksum & 0xffff); let checksum = ((checksum >> 16) as u16) + (checksum as u16); - let checksum = dbg!(!checksum).to_be_bytes(); + let checksum = (!checksum).to_be_bytes(); //bytes[16] = checksum[0]; //bytes[17] = checksum[1]; diff --git a/pcap-sys/build.rs b/pcap-sys/build.rs index 115ba60..727da70 100644 --- a/pcap-sys/build.rs +++ b/pcap-sys/build.rs @@ -32,5 +32,5 @@ fn main() { // panic!("hahahahah test {}", dst.display()); println!("cargo:rustc-link-search=native={}/lib", dst.display()); - println!("cargo:rustc-link-lib=static=pcap"); + println!("cargo:rustc-link-lib=pcap"); } diff --git a/sparse-05/sparse-05-server/src/connection.rs b/sparse-05/sparse-05-server/src/connection.rs index f78c260..f91dbc9 100644 --- a/sparse-05/sparse-05-server/src/connection.rs +++ b/sparse-05/sparse-05-server/src/connection.rs @@ -104,7 +104,9 @@ pub fn spawn_connection_handler( use packets::*; let packet = connection_packet.pkt(); - let Layer3Pkt::IPv4Pkt(ip_pkt) = packet.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = packet.get_layer3_pkt()? else { + todo!() + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; @@ -199,7 +201,9 @@ fn authenticate( Ok(p) => { use packets::*; let p = p.pkt(); - let Layer3Pkt::IPv4Pkt(ip_pkt) = p.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = p.get_layer3_pkt()? else { + todo!() + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; @@ -249,7 +253,9 @@ where let msg = packet_handler.recv()?; let pkt = msg.pkt(); - let Layer3Pkt::IPv4Pkt(ip_pkt) = pkt.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = pkt.get_layer3_pkt()? else { + todo!() + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; diff --git a/sparse-05/sparse-05-server/src/interface.rs b/sparse-05/sparse-05-server/src/interface.rs index eda0f61..e0c7f3e 100644 --- a/sparse-05/sparse-05-server/src/interface.rs +++ b/sparse-05/sparse-05-server/src/interface.rs @@ -110,7 +110,9 @@ impl InterfaceSender { Self::RawUdp(interf) => Ok(interf.sendpacket(packet)?), Self::Udp(interf) => { use packets::*; - let Layer3Pkt::IPv4Pkt(ip_pkt) = packet.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = packet.get_layer3_pkt()? else { + todo!() + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; diff --git a/sparse-05/sparse-05-server/src/main.rs b/sparse-05/sparse-05-server/src/main.rs index d5af914..46f59df 100644 --- a/sparse-05/sparse-05-server/src/main.rs +++ b/sparse-05/sparse-05-server/src/main.rs @@ -74,7 +74,9 @@ fn main() -> anyhow::Result<()> { let pkt = pkt.pkt(); - let Layer3Pkt::IPv4Pkt(ip_pkt) = pkt.get_layer3_pkt()?; + let Layer3Pkt::IPv4Pkt(ip_pkt) = pkt.get_layer3_pkt()? else { + todo!() + }; let Layer4Pkt::UDP(udp_pkt) = ip_pkt.get_layer4_packet()? else { todo!() }; diff --git a/tcp-test/client/Cargo.toml b/tcp-test/client/Cargo.toml index 853cbcb..599d410 100644 --- a/tcp-test/client/Cargo.toml +++ b/tcp-test/client/Cargo.toml @@ -13,7 +13,8 @@ rand = "0.8.5" tokio = { version = "1.32.0", features = ["full"] } anyhow = "1.0.75" tokio-stream = { version = "0.1.14", features = ["full"] } -smoltcp = { version = "0.10", features = ["socket-tcp", "phy-raw_socket", "std", "async", "medium-ethernet", "proto-ipv4", "reassembly-buffer-size-65536", "fragmentation-buffer-size-65536", "proto-ipv4-fragmentation", "log", "verbose"] } +#smoltcp = { version = "0.10", features = ["socket-tcp", "phy-raw_socket", "std", "async", "medium-ethernet", "proto-ipv4", "reassembly-buffer-size-65536", "fragmentation-buffer-size-65536", "proto-ipv4-fragmentation", "log", "verbose"] } libc = "0.2.148" -log = "0.4.20" +log = { version = "0.4.20", features = [ "kv_unstable" ] } simple_logger = "4.2.0" +ringbuf = "0.3.3" diff --git a/tcp-test/client/src/main.rs b/tcp-test/client/src/main.rs index bbb8c1a..00c27bf 100644 --- a/tcp-test/client/src/main.rs +++ b/tcp-test/client/src/main.rs @@ -1,12 +1,15 @@ use std::{ collections::VecDeque, + mem::MaybeUninit, net::{Ipv4Addr, SocketAddr}, - time::Duration, + time::{Duration, Instant}, }; -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, Context}; +use ringbuf::LocalRb; +use tokio::{sync::mpsc, task, time::sleep_until}; +use tokio_stream::StreamExt; -use log::{debug, error, info, trace}; use nl_sys::{netlink, route}; use packets::{ @@ -14,8 +17,6 @@ use packets::{ Layer3Pkt, Layer4Packet, Layer4Pkt, TCPPacket, TCPPacketBuilder, TCPPkt, }; use pcap_sys; -use tokio::{sync::mpsc, task}; -use tokio_stream::StreamExt; struct PeerInfo { remote_addr: Ipv4Addr, @@ -24,10 +25,17 @@ struct PeerInfo { local_port: u16, } +enum Timeout { + Idle { heartbeat: Option }, + Retransmit {}, +} + #[derive(Default)] struct TcpSocket { - tx_buffer: VecDeque, - rx_buffer: VecDeque, + tx_buffer: LocalRb; 65536]>, + rx_buffer: LocalRb; 65536]>, + first_ack: u32, + first_rack: u32, local_seq_no: u32, remote_seq_no: u32, remote_last_ack: Option, @@ -36,9 +44,12 @@ struct TcpSocket { local_rx_last_ack: Option, state: TcpState, peer_info: Option, - ack_timeout: Option, + retransmit_timeout: Option, + last_packet_received: Option, } +impl TcpSocket {} + #[derive(Default)] enum TcpState { Listen, @@ -82,11 +93,12 @@ fn socket_accepts_packet(socket: &TcpSocket, ip: &IPv4Pkt<'_>, tcp: &TCPPkt<'_>) } } -fn connect( +async fn connect( socket: &mut TcpSocket, remote_addr: Ipv4Addr, remote_port: u16, local_addr: Ipv4Addr, + send_state_changed: &mpsc::Sender, ) -> (u16, TCPPacket) { socket.state = TcpState::SynSent; let local_port: u16 = loop { @@ -95,6 +107,7 @@ fn connect( break port; } }; + let _ = send_state_changed.send(TcpState::SynSent).await; socket.peer_info = Some(PeerInfo { remote_addr, remote_port, @@ -103,6 +116,7 @@ fn connect( }); socket.local_seq_no = rand::random(); + socket.first_ack = socket.local_seq_no; let packet = TCPPacketBuilder::default() .srcport(local_port) @@ -116,31 +130,213 @@ fn connect( (local_port, packet) } -fn process_packet( +fn process_timeout(socket: &mut TcpSocket) -> anyhow::Result)>> { + Ok(None) +} + +async fn process_packet( socket: &mut TcpSocket, packet: TCPPkt<'_>, -) -> anyhow::Result<(Option<(TCPPacketBuilder, Vec)>, Option>)> { - match socket.state { - TcpState::SynSent if packet.ack() && packet.syn() => {} - _ => {} + send_state_changed: &mpsc::Sender, +) -> anyhow::Result<(Vec<(TCPPacketBuilder, Vec)>, Option>)> { + match (&socket.state, &socket.peer_info) { + (TcpState::SynSent, Some(peer)) if packet.ack() && packet.syn() => { + log::debug!("established connection with peer"); + + socket.remote_last_ack = Some(socket.local_seq_no); + socket.remote_seq_no = packet.seqnumber() + 1; + socket.remote_last_win = packet.window(); + socket.first_rack = packet.seqnumber(); + socket.local_seq_no += 1; + + let tcppacketbuilder = TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .ack(true) + .window(512) + .acknumber(socket.remote_seq_no) + .seqnumber(socket.local_seq_no); + + socket.state = TcpState::Established; + let _ = send_state_changed.send(TcpState::Established).await; + + Ok((vec![(tcppacketbuilder, vec![])], None)) + } + (TcpState::Established, Some(peer)) if packet.fin() => { + log::debug!("Received fin!"); + + socket.remote_last_ack = Some(socket.local_seq_no); + socket.remote_last_win = packet.window(); + socket.remote_seq_no += 1; + + socket.state = TcpState::CloseWait; + let _ = send_state_changed.send(TcpState::CloseWait).await; + + let ackbuilder = TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .ack(true) + .window(512) + .acknumber(socket.remote_seq_no) + .seqnumber(socket.local_seq_no); + + socket.local_seq_no += 1; + + let finbuilder = TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .ack(true) + .window(512) + .acknumber(socket.remote_seq_no) + .seqnumber(socket.local_seq_no); + + Ok((vec![(ackbuilder, vec![]), (finbuilder, vec![])], None)) + } + (TcpState::Established, Some(peer)) if packet.ack() => { + log::debug!("received packet from server",); + + socket.remote_last_ack = Some(socket.local_seq_no); + socket.remote_last_win = packet.window(); + socket.remote_seq_no += (packet.data().len() & 0xFFFFFFFF) as u32; + + let tcppacketbuilder = TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .ack(true) + .window(512) + .acknumber(socket.remote_seq_no) + .seqnumber(socket.local_seq_no); + + if !packet.data().is_empty() { + log::debug!("received data: {:?}", packet.data()); + } + + Ok(( + vec![(tcppacketbuilder, vec![])], + match packet.data() { + [] => None, + da => Some(da.to_owned()), + }, + )) + } + (TcpState::FinWait1, _) if packet.ack() => { + socket.state = TcpState::FinWait2; + let _ = send_state_changed.send(TcpState::FinWait2).await; + + Ok((vec![], None)) + } + (TcpState::FinWait2, Some(peer)) if packet.fin() => { + socket.state = TcpState::Closed; + let _ = send_state_changed.send(TcpState::Closed).await; + + socket.remote_last_ack = Some(socket.local_seq_no); + socket.remote_seq_no = packet.seqnumber() + 1; + socket.remote_last_win = packet.window(); + socket.local_seq_no += 1; + + let tcppacketbuilder = TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .ack(true) + .window(512) + .acknumber(socket.remote_seq_no) + .seqnumber(socket.local_seq_no); + + Ok((vec![(tcppacketbuilder, vec![])], None)) + } + _ => Ok((vec![], None)), } - Ok((None, None)) +} + +fn resp_tcp(incoming: &TCPPkt<'_>) -> TCPPacketBuilder { + TCPPacketBuilder::default() + .srcport(incoming.dstport()) + .dstport(incoming.srcport()) +} + +fn seq_tcp(socket: &TcpSocket, mut builder: TCPPacketBuilder) -> TCPPacketBuilder { + builder +} + +fn recv_data(socket: &mut TcpSocket, data: &mut [u8]) -> anyhow::Result { + let (mut prod, mut cons) = socket.tx_buffer.split_ref(); + + let bytes_read = cons.pop_slice(data); + socket.remote_seq_no += (bytes_read & 0xFFFFFFFF) as u32; + + Ok(bytes_read) } fn send_data( socket: &mut TcpSocket, data: Vec, -) -> anyhow::Result)>> { - Ok(None) +) -> anyhow::Result)>> { + let (mut prod, cons) = socket.tx_buffer.split_ref(); + + if cons.is_empty() { + _ = prod.push_iter(&mut data.into_iter()); + + Ok(vec![]) + } else { + _ = prod.push_iter(&mut data.into_iter()); + Ok(vec![]) + } +} + +fn close_connection(socket: &mut TcpSocket) -> anyhow::Result { + socket.state = TcpState::FinWait1; + + let peer = socket + .peer_info + .as_ref() + .context("no connection to close")?; + + Ok(TCPPacketBuilder::default() + .srcport(peer.local_port) + .dstport(peer.remote_port) + .fin(true) + .ack(true) + .window(512) + .acknumber( + socket + .local_rx_last_ack + .context("information from synchronizing missing")?, + ) + .seqnumber(socket.local_seq_no) + .build(peer.local_addr, peer.remote_addr, vec![], None)) } struct TcpSocketHandle { + state_changed: mpsc::Receiver, send_channel: mpsc::Sender>, + close: mpsc::Sender<()>, receiver_channel: mpsc::Receiver>, } async fn use_socket(mut socket_handle: TcpSocketHandle) { - _ = socket_handle.send_channel.send(b"ping".to_vec()).await; + loop { + let state = socket_handle.state_changed.recv().await; + match state { + Some(TcpState::Established) => break, + _ => {} + } + } + + log::info!("Connected to server!"); + + log::info!("disconnecting!"); + + /*_ = socket_handle.close.send(()).await; + + loop { + let state = socket_handle.state_changed.recv().await; + match state { + Some(TcpState::Closed) => break, + _ => {} + } + }*/ + + log::info!("disconnected!"); match socket_handle.receiver_channel.recv().await { Some(bytes) => match std::str::from_utf8(&bytes) { @@ -155,6 +351,10 @@ async fn use_socket(mut socket_handle: TcpSocketHandle) { log::error!("could not get packets from server") } } + + /*_ = socket_handle.send_channel.send(b"pong".to_vec()).await; + + log::info!("Sent 'pong'!");*/ } #[tokio::main] @@ -183,7 +383,6 @@ async fn main() -> anyhow::Result<()> { }; let mut socket = TcpSocket::default(); - let mut interface = pcap_sys::Interface::::new(&ifname)?; let srcip: u32 = srcip.into(); @@ -207,7 +406,9 @@ async fn main() -> anyhow::Result<()> { }}; } - let (port, packet) = connect(&mut socket, ip, 54248, srcip); + let (send_state_changed, mut receive_state_changed) = mpsc::channel(64); + let (port, packet) = connect(&mut socket, ip, 54248, srcip, &send_state_changed).await; + _ = receive_state_changed.recv().await; interface.set_filter(&format!("arp or (inbound and tcp port {port})"), true, None)?; @@ -221,16 +422,32 @@ async fn main() -> anyhow::Result<()> { let (send_channel, mut send_packet) = mpsc::channel(1024); let (receive_packet, receiver_channel) = mpsc::channel(1024); + let (send_state_changed, state_changed) = mpsc::channel(1024); + let (close, mut recv_close) = mpsc::channel(16); let socket_handle = TcpSocketHandle { + state_changed, receiver_channel, send_channel, + close, }; task::spawn(async move { use_socket(socket_handle) }); + let mut packet_queue = VecDeque::new(); + loop { - let Some(packet) = tokio::select! { + let deadline = Instant::now() + + socket + .retransmit_timeout + .unwrap_or_else(|| Duration::from_millis(10)); + + tokio::select! { + _ = sleep_until(deadline.into()) => { + let Ok(Some((b, d))) = process_timeout(&mut socket) else { continue; }; + let pkt = format_packet!(b.build(srcip, ip, d, None)); + packet_queue.push_back(pkt); + }, Some(Ok(bytes)) = packets.next() => { let pkt = bytes.pkt(); @@ -242,13 +459,17 @@ async fn main() -> anyhow::Result<()> { continue; } - let Ok((to_send, received)) = process_packet(&mut socket, tcp_pkt) else { continue; }; + let Ok((to_send, received)) = process_packet(&mut socket, tcp_pkt, &send_state_changed).await else { continue; }; if let Some(received) = received { _ = receive_packet.send(received).await; } - to_send.map(|(b, d)| format_packet!(b.build(srcip, ip, d, None))) + for (b, d) in to_send { + log::trace!("adding packet to send: {b:?}"); + let pkt = format_packet!(b.build(srcip, ip, d, None)); + packet_queue.push_back(pkt); + } }, Ok(Layer3Pkt::ARP(arp)) => { if arp.opcode() != 1 || arp.plen() != 4 || arp.hwlen() != 6 { @@ -265,23 +486,36 @@ async fn main() -> anyhow::Result<()> { } let response = ARPPacket::construct(ARPMode::Reply, ARPProto::IPv4, &src_mac, &sendermac, &queryip.octets(), &senderip); - - Some(EthernetPacket::construct(src_mac, sendermac.try_into().unwrap(), &Layer3Packet::ARP(response))) + let resp2 = EthernetPacket::construct(src_mac, sendermac.try_into().unwrap(), &Layer3Packet::ARP(response)); + log::trace!("adding packet to send: ARP"); + packet_queue.push_back(resp2); }, _ => continue + }; + }, + Some(()) = recv_close.recv() => { + log::trace!("adding packet to send: TCP/IP close"); + let Ok(to_send) = close_connection(&mut socket) else { continue; }; + packet_queue.push_back(format_packet!(to_send)); + }, + Some(to_send) = send_packet.recv() => { + log::trace!("adding packet to send: TCP/IP message send"); + match send_data(&mut socket, to_send) { + Ok(pkts) => { + for (b, d) in pkts { + let pkt = format_packet!(b.build(srcip, ip, d, None)); + packet_queue.push_back(pkt); + } + } + _ => continue } }, - Some(to_send) = send_packet.recv() => { - match send_data(&mut socket, to_send) { - Ok(v) => v.map(|(b, d)| format_packet!(b.build(srcip, ip, d, None))), - Err(_) => continue - } - }, else => { continue; } - } else { - continue; - }; + } - _ = packets.sendpacket(packet.pkt()); + if let Some(packet) = packet_queue.pop_front() { + log::trace!("sending packet now ({packet:?})"); + _ = packets.sendpacket(packet.pkt()); + } } } diff --git a/tcp-test/sample-connection2.pcapng b/tcp-test/sample-connection2.pcapng new file mode 100644 index 0000000..40e3b12 Binary files /dev/null and b/tcp-test/sample-connection2.pcapng differ diff --git a/tcp-test/server.py b/tcp-test/server.py index 08a56c5..7ea5e71 100644 --- a/tcp-test/server.py +++ b/tcp-test/server.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 import socket +import fcntl +import array +import termios server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -12,7 +15,20 @@ server.listen(32) while True: client, addr = server.accept() - print('Got connection') - with client: - print(client.recv(24)) - client.sendall(b"pong") + print("Got connection") + try: + with client: + client.sendall(b"ping") + print("Message sent") + + buf = array.array("h", [0]) + fcntl.ioctl(client.fileno(), termios.TIOCOUTQ, buf) + while buf[0] != 0: + fcntl.ioctl(client.fileno(), termios.TIOCOUTQ, buf) + + print("Received message ack") + + # print(client.recv(24)) + except Exception as e: + client.close() + print(e)