522 lines
16 KiB
Rust

use std::{
collections::VecDeque,
mem::MaybeUninit,
net::{Ipv4Addr, SocketAddr},
time::{Duration, Instant},
};
use anyhow::{anyhow, bail, Context};
use ringbuf::LocalRb;
use tokio::{sync::mpsc, task, time::sleep_until};
use tokio_stream::StreamExt;
use nl_sys::{netlink, route};
use packets::{
self, ARPMode, ARPPacket, ARPProto, EthernetPacket, IPv4Packet, IPv4Pkt, Layer3Packet,
Layer3Pkt, Layer4Packet, Layer4Pkt, TCPPacket, TCPPacketBuilder, TCPPkt,
};
use pcap_sys;
struct PeerInfo {
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
local_port: u16,
}
enum Timeout {
Idle { heartbeat: Option<Instant> },
Retransmit {},
}
#[derive(Default)]
struct TcpSocket {
tx_buffer: LocalRb<u8, [MaybeUninit<u8>; 65536]>,
rx_buffer: LocalRb<u8, [MaybeUninit<u8>; 65536]>,
first_ack: u32,
first_rack: u32,
local_seq_no: u32,
remote_seq_no: u32,
remote_last_ack: Option<u32>,
remote_last_win: u16,
local_rx_last_seq: Option<u32>,
local_rx_last_ack: Option<u32>,
state: TcpState,
peer_info: Option<PeerInfo>,
retransmit_timeout: Option<Duration>,
last_packet_received: Option<Instant>,
}
impl TcpSocket {}
#[derive(Default)]
enum TcpState {
Listen,
SynSent,
SynReceived,
Established,
FinWait1,
FinWait2,
CloseWait,
Closing,
LastAck,
TimeWait,
#[default]
Closed,
}
enum TcpOptions {
EndOfList,
NoOp,
MaxSegmentSize(u16),
}
impl TcpOptions {
fn get_bytes(&self) -> Vec<u8> {
match self {
Self::EndOfList => vec![0x00],
Self::NoOp => vec![0x01],
Self::MaxSegmentSize(size) => [&[0x02, 0x04][..], &size.to_be_bytes()].concat(),
}
}
}
fn socket_accepts_packet(socket: &TcpSocket, ip: &IPv4Pkt<'_>, tcp: &TCPPkt<'_>) -> bool {
if let Some(peer) = &socket.peer_info {
peer.local_addr == ip.dest_ip()
&& peer.remote_addr == ip.source_ip()
&& peer.local_port == tcp.dstport()
&& peer.remote_port == tcp.srcport()
} else {
false
}
}
async fn connect(
socket: &mut TcpSocket,
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
send_state_changed: &mpsc::Sender<TcpState>,
) -> (u16, TCPPacket) {
socket.state = TcpState::SynSent;
let local_port: u16 = loop {
let port = rand::random();
if port > 40000 {
break port;
}
};
let _ = send_state_changed.send(TcpState::SynSent).await;
socket.peer_info = Some(PeerInfo {
remote_addr,
remote_port,
local_addr,
local_port,
});
socket.local_seq_no = rand::random();
socket.first_ack = socket.local_seq_no;
let packet = TCPPacketBuilder::default()
.srcport(local_port)
.dstport(remote_port)
.syn(true)
.window(64240)
.seqnumber(socket.local_seq_no)
//.options([TcpOptions::MaxSegmentSize(64240).get_bytes()].concat())
.build(local_addr, remote_addr, vec![], None);
(local_port, packet)
}
fn process_timeout(socket: &mut TcpSocket) -> anyhow::Result<Option<(TCPPacketBuilder, Vec<u8>)>> {
Ok(None)
}
async fn process_packet(
socket: &mut TcpSocket,
packet: TCPPkt<'_>,
send_state_changed: &mpsc::Sender<TcpState>,
) -> anyhow::Result<(Vec<(TCPPacketBuilder, Vec<u8>)>, Option<Vec<u8>>)> {
match (&socket.state, &socket.peer_info) {
(TcpState::SynSent, Some(peer)) if packet.ack() && packet.syn() => {
log::debug!("established connection with peer");
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_seq_no = packet.seqnumber() + 1;
socket.remote_last_win = packet.window();
socket.first_rack = packet.seqnumber();
socket.local_seq_no += 1;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
socket.state = TcpState::Established;
let _ = send_state_changed.send(TcpState::Established).await;
Ok((vec![(tcppacketbuilder, vec![])], None))
}
(TcpState::Established, Some(peer)) if packet.fin() => {
log::debug!("Received fin!");
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_last_win = packet.window();
socket.remote_seq_no += 1;
socket.state = TcpState::CloseWait;
let _ = send_state_changed.send(TcpState::CloseWait).await;
let ackbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
socket.local_seq_no += 1;
let finbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
Ok((vec![(ackbuilder, vec![]), (finbuilder, vec![])], None))
}
(TcpState::Established, Some(peer)) if packet.ack() => {
log::debug!("received packet from server",);
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_last_win = packet.window();
socket.remote_seq_no += (packet.data().len() & 0xFFFFFFFF) as u32;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
if !packet.data().is_empty() {
log::debug!("received data: {:?}", packet.data());
}
Ok((
vec![(tcppacketbuilder, vec![])],
match packet.data() {
[] => None,
da => Some(da.to_owned()),
},
))
}
(TcpState::FinWait1, _) if packet.ack() => {
socket.state = TcpState::FinWait2;
let _ = send_state_changed.send(TcpState::FinWait2).await;
Ok((vec![], None))
}
(TcpState::FinWait2, Some(peer)) if packet.fin() => {
socket.state = TcpState::Closed;
let _ = send_state_changed.send(TcpState::Closed).await;
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_seq_no = packet.seqnumber() + 1;
socket.remote_last_win = packet.window();
socket.local_seq_no += 1;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
Ok((vec![(tcppacketbuilder, vec![])], None))
}
_ => Ok((vec![], None)),
}
}
fn resp_tcp(incoming: &TCPPkt<'_>) -> TCPPacketBuilder {
TCPPacketBuilder::default()
.srcport(incoming.dstport())
.dstport(incoming.srcport())
}
fn seq_tcp(socket: &TcpSocket, mut builder: TCPPacketBuilder) -> TCPPacketBuilder {
builder
}
fn recv_data(socket: &mut TcpSocket, data: &mut [u8]) -> anyhow::Result<usize> {
let (mut prod, mut cons) = socket.tx_buffer.split_ref();
let bytes_read = cons.pop_slice(data);
socket.remote_seq_no += (bytes_read & 0xFFFFFFFF) as u32;
Ok(bytes_read)
}
fn send_data(
socket: &mut TcpSocket,
data: Vec<u8>,
) -> anyhow::Result<Vec<(TCPPacketBuilder, Vec<u8>)>> {
let (mut prod, cons) = socket.tx_buffer.split_ref();
if cons.is_empty() {
_ = prod.push_iter(&mut data.into_iter());
Ok(vec![])
} else {
_ = prod.push_iter(&mut data.into_iter());
Ok(vec![])
}
}
fn close_connection(socket: &mut TcpSocket) -> anyhow::Result<TCPPacket> {
socket.state = TcpState::FinWait1;
let peer = socket
.peer_info
.as_ref()
.context("no connection to close")?;
Ok(TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.fin(true)
.ack(true)
.window(512)
.acknumber(
socket
.local_rx_last_ack
.context("information from synchronizing missing")?,
)
.seqnumber(socket.local_seq_no)
.build(peer.local_addr, peer.remote_addr, vec![], None))
}
struct TcpSocketHandle {
state_changed: mpsc::Receiver<TcpState>,
send_channel: mpsc::Sender<Vec<u8>>,
close: mpsc::Sender<()>,
receiver_channel: mpsc::Receiver<Vec<u8>>,
}
async fn use_socket(mut socket_handle: TcpSocketHandle) {
loop {
let state = socket_handle.state_changed.recv().await;
match state {
Some(TcpState::Established) => break,
_ => {}
}
}
log::info!("Connected to server!");
log::info!("disconnecting!");
/*_ = socket_handle.close.send(()).await;
loop {
let state = socket_handle.state_changed.recv().await;
match state {
Some(TcpState::Closed) => break,
_ => {}
}
}*/
log::info!("disconnected!");
match socket_handle.receiver_channel.recv().await {
Some(bytes) => match std::str::from_utf8(&bytes) {
Ok(string) => {
log::info!("received packet: {string}")
}
Err(_) => {
log::info!("received packet: {bytes:?}");
}
},
None => {
log::error!("could not get packets from server")
}
}
/*_ = socket_handle.send_channel.send(b"pong".to_vec()).await;
log::info!("Sent 'pong'!");*/
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Trace)
.with_module_level("tcp_test", log::LevelFilter::Trace)
.init()?;
let ip = std::env::args()
.skip(1)
.next()
.ok_or(anyhow!("could not get target IP"))?
.parse::<Ipv4Addr>()?;
let (ifname, _, srcip, src_mac, dst_mac, _) = {
let socket = netlink::Socket::new()?;
let routes = socket.get_routes()?;
let neighs = socket.get_neigh()?;
let links = socket.get_links()?;
let addrs = socket.get_addrs()?;
route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, ip)
.ok_or(anyhow!("unable to find a route to the IP"))?
};
let mut socket = TcpSocket::default();
let mut interface = pcap_sys::Interface::<pcap_sys::DevDisabled>::new(&ifname)?;
let srcip: u32 = srcip.into();
let srcip = srcip + 10;
let srcip: Ipv4Addr = srcip.into();
dbg!(srcip, src_mac, dst_mac);
// let dst_mac = [0x00, 0x16, 0x3e, 0xde, 0xa9, 0x93];
interface.set_buffer_size(8192)?;
interface.set_non_blocking(true)?;
interface.set_promisc(false)?;
interface.set_timeout(10)?;
let interface = interface.activate()?;
macro_rules! format_packet {
($tcp_packet:expr) => {{
let ippkt = IPv4Packet::construct(srcip, ip, &Layer4Packet::TCP($tcp_packet));
EthernetPacket::construct(src_mac, dst_mac, &Layer3Packet::IPv4(ippkt))
}};
}
let (send_state_changed, mut receive_state_changed) = mpsc::channel(64);
let (port, packet) = connect(&mut socket, ip, 54248, srcip, &send_state_changed).await;
_ = receive_state_changed.recv().await;
interface.set_filter(&format!("arp or (inbound and tcp port {port})"), true, None)?;
if interface.datalink() != pcap_sys::consts::DLT_EN10MB {
bail!("interface does not support ethernet");
}
let mut packets = interface.stream()?;
packets.sendpacket(format_packet!(packet).pkt())?;
let (send_channel, mut send_packet) = mpsc::channel(1024);
let (receive_packet, receiver_channel) = mpsc::channel(1024);
let (send_state_changed, state_changed) = mpsc::channel(1024);
let (close, mut recv_close) = mpsc::channel(16);
let socket_handle = TcpSocketHandle {
state_changed,
receiver_channel,
send_channel,
close,
};
task::spawn(async move { use_socket(socket_handle) });
let mut packet_queue = VecDeque::new();
loop {
let deadline = Instant::now()
+ socket
.retransmit_timeout
.unwrap_or_else(|| Duration::from_millis(10));
tokio::select! {
_ = sleep_until(deadline.into()) => {
let Ok(Some((b, d))) = process_timeout(&mut socket) else { continue; };
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
},
Some(Ok(bytes)) = packets.next() => {
let pkt = bytes.pkt();
match pkt.get_layer3_pkt() {
Ok(Layer3Pkt::IPv4Pkt(ip_pkt)) => {
let Ok(Layer4Pkt::TCP(tcp_pkt)) = ip_pkt.get_layer4_packet() else { continue; };
if !socket_accepts_packet(&socket, &ip_pkt, &tcp_pkt) {
continue;
}
let Ok((to_send, received)) = process_packet(&mut socket, tcp_pkt, &send_state_changed).await else { continue; };
if let Some(received) = received {
_ = receive_packet.send(received).await;
}
for (b, d) in to_send {
log::trace!("adding packet to send: {b:?}");
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
}
},
Ok(Layer3Pkt::ARP(arp)) => {
if arp.opcode() != 1 || arp.plen() != 4 || arp.hwlen() != 6 {
continue;
}
let senderip: [u8; 4] = arp.srcprotoaddr().try_into().unwrap();
let sendermac: &[u8] = arp.srchwaddr();
let queryip: [u8; 4] = arp.targetprotoaddr().try_into().unwrap();
let queryip: Ipv4Addr = queryip.into();
if queryip != srcip {
continue;
}
let response = ARPPacket::construct(ARPMode::Reply, ARPProto::IPv4, &src_mac, &sendermac, &queryip.octets(), &senderip);
let resp2 = EthernetPacket::construct(src_mac, sendermac.try_into().unwrap(), &Layer3Packet::ARP(response));
log::trace!("adding packet to send: ARP");
packet_queue.push_back(resp2);
},
_ => continue
};
},
Some(()) = recv_close.recv() => {
log::trace!("adding packet to send: TCP/IP close");
let Ok(to_send) = close_connection(&mut socket) else { continue; };
packet_queue.push_back(format_packet!(to_send));
},
Some(to_send) = send_packet.recv() => {
log::trace!("adding packet to send: TCP/IP message send");
match send_data(&mut socket, to_send) {
Ok(pkts) => {
for (b, d) in pkts {
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
}
}
_ => continue
}
},
else => { continue; }
}
if let Some(packet) = packet_queue.pop_front() {
log::trace!("sending packet now ({packet:?})");
_ = packets.sendpacket(packet.pkt());
}
}
}