feat: started to make a TCP state machine

This commit is contained in:
Andrew Rioux
2023-09-20 20:50:04 -04:00
parent f092548a8c
commit ed13defb07
3 changed files with 142 additions and 212 deletions

View File

@@ -1,18 +1,94 @@
use std::{net::Ipv4Addr, os::fd::AsRawFd, sync::Arc};
use std::{
collections::VecDeque,
net::{Ipv4Addr, SocketAddr},
};
use anyhow::anyhow;
use log::{debug, info, trace};
use nl_sys::{netlink, route};
use smoltcp::{
iface::{Config, Interface, Route, SocketSet},
phy::{wait as phy_wait, Device, Medium, RawSocket},
socket::tcp,
time::Instant,
wire::{EthernetAddress, IpAddress, IpCidr, Ipv4Address},
};
use tokio::io::{unix::AsyncFd, Interest};
use packets::{self, EthernetPacket, IPv4Pkt, TCPPacket, TCPPkt};
use pcap_sys;
struct PeerInfo {
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
local_port: u16,
}
#[derive(Default)]
struct TcpSocket {
tx_buffer: VecDeque<u8>,
rx_buffer: VecDeque<u8>,
local_seq_no: u32,
remote_seq_no: u32,
remote_last_ack: Option<u32>,
remote_last_win: u16,
local_rx_last_seq: Option<u32>,
local_rx_last_ack: Option<u32>,
state: TcpState,
peer_info: Option<PeerInfo>,
}
#[derive(Default)]
enum TcpState {
Listen,
SynSent,
SynReceived,
Established,
FinWait1,
FinWait2,
CloseWait,
Closing,
LastAck,
TimeWait,
#[default]
Closed,
}
fn socket_accepts_packet(socket: &TcpSocket, ip: IPv4Pkt<'_>, tcp: TCPPkt<'_>) -> bool {
if let Some(peer) = &socket.peer_info {
peer.local_addr == ip.dest_ip()
&& peer.remote_addr == ip.source_ip()
&& peer.local_port == tcp.dstport()
&& peer.remote_port == tcp.srcport()
} else {
false
}
}
fn connect(
socket: &mut TcpSocket,
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
) -> anyhow::Result<TCPPacket> {
socket.state = TcpState::SynSent;
let local_port: u16 = loop {
let port = rand::random();
if port > 40000 {
break port;
}
};
socket.peer_info = Some(PeerInfo {
remote_addr,
remote_port,
local_addr,
local_port,
});
Ok(())
}
fn process_packet(socket: &mut TcpSocket, packet: TCPPkt<'_>) -> anyhow::Result<Option<TCPPacket>> {
match socket.state {
TcpState::SynSent if packet.ack() && packet.syn() => {}
_ => {}
}
Ok(None)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -41,193 +117,7 @@ async fn main() -> anyhow::Result<()> {
(routes, res)
};
// dbg!((&ifname, srcip, src_mac, dst_mac));
let socket = TcpSocket::default();
let mut device = RawSocket::new(&ifname, Medium::Ethernet)?;
let mut config = Config::new(EthernetAddress(src_mac).into());
config.random_seed = rand::random();
let mut iface = Interface::new(config, &mut device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
let o = srcip.octets();
debug!(
"source network ip: {}.{}.{}.{}/{src_snmask}",
o[0], o[1], o[2], o[3]
);
ip_addrs
.push(IpCidr::new(
IpAddress::v4(o[0], o[1], o[2], o[3]),
src_snmask,
))
.unwrap();
});
for route in routes.iter() {
let Some(dst) = route.dst() else {
trace!("failed to get route destination");
continue;
};
let Some(hop) = route.hop_iter().next() else {
trace!("no next hop existed for {dst:?}");
continue;
};
if hop.ifindex() != ifindex {
trace!("hop doesn't match ifindex {ifindex}");
continue;
}
let Some(laddr) = hop.gateway() else {
trace!("couldn't get gateway address for {dst:?}");
continue;
};
if laddr.atype() != Some(libc::AF_INET) {
trace!("unable to load IP info for {dst:?}");
continue;
}
if dst.cidrlen() == 0 {
info!("setting default route via {:?}", &laddr);
iface
.routes_mut()
.add_default_ipv4_route(Ipv4Address::from_bytes(&laddr.hw_address()))?;
iface.routes_mut().update(|routes| {
let lip = laddr.hw_address();
_ = routes.push(Route {
cidr: IpCidr::new(IpAddress::v4(10, 0, 0, 0), 8),
via_router: IpAddress::v4(lip[0], lip[1], lip[2], lip[3]),
expires_at: None,
preferred_until: None,
});
_ = routes.push(Route {
cidr: IpCidr::new(IpAddress::v4(172, 16, 0, 0), 12),
via_router: IpAddress::v4(lip[0], lip[1], lip[2], lip[3]),
expires_at: None,
preferred_until: None,
});
});
} else {
let Some(raddr) = route.dst() else {
continue;
};
if raddr.atype() != Some(libc::AF_INET) {
continue;
}
let lip = laddr.hw_address();
let rip = raddr.hw_address();
info!(
"queueing adding {}.{}.{}.{}/{} via {}.{}.{}.{}",
rip[0],
rip[1],
rip[2],
rip[3],
dst.cidrlen(),
lip[0],
lip[1],
lip[2],
lip[3]
);
iface.routes_mut().update(|routes| {
info!(
"adding {}.{}.{}.{}/{} via {}.{}.{}.{}",
rip[0],
rip[1],
rip[2],
rip[3],
dst.cidrlen(),
lip[0],
lip[1],
lip[2],
lip[3]
);
_ = routes.push(Route {
cidr: IpCidr::new(
IpAddress::v4(rip[0], rip[1], rip[2], rip[3]),
dst.cidrlen() as u8,
),
via_router: IpAddress::v4(lip[0], lip[1], lip[2], lip[3]),
expires_at: None,
preferred_until: None,
})
});
}
}
debug!("routes added:");
iface.routes_mut().update(|r| {
for r in r {
debug!("\t{r:?}");
}
});
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
let mut sockets = SocketSet::new(vec![]);
let tcp_handle = sockets.add(tcp_socket);
let port = loop {
let port = rand::random::<u16>();
if port > 40_000 {
break port;
}
};
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
socket.connect(iface.context(), (ip, 54248), port)?;
let fd = device.as_raw_fd();
/*let interest = Interest::WRITABLE
.add(Interest::READABLE)
.add(Interest::ERROR)
.add(Interest::PRIORITY);
let afd = AsyncFd::with_interest(fd, interest)?;*/
let mut tcp_active = false;
loop {
let timestamp = Instant::now();
iface.poll(timestamp, &mut device, &mut sockets);
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
if socket.is_active() && !tcp_active {
info!("connected!");
tcp_active = true;
} else if !socket.is_active() && tcp_active {
info!("disconnected");
tcp_active = false;
}
tcp_active = socket.is_active();
if !socket.is_active() && !tcp_active {
socket.connect(iface.context(), (ip, 54248), port)?;
info!("connecting...");
}
if socket.can_send() {
socket.send_slice(b"ping")?;
info!("sent data!");
} else if socket.may_recv() {
socket.recv(|data| {
if !data.is_empty() {
match std::str::from_utf8(&data) {
Ok(s) => info!("Data received: {}", s),
Err(_) => info!("Data received: {:?}", data),
}
}
(data.len(), data)
})?;
}
phy_wait(fd, iface.poll_delay(timestamp, &sockets))?;
// drop(afd.ready(interest).await?);
}
Ok(())
}