feat: added tcp

sorry Judah
This commit is contained in:
Andrew Rioux
2025-02-12 17:49:31 -05:00
parent e388b2eefa
commit f9ff9f266a
37 changed files with 1939 additions and 902 deletions

View File

@@ -31,4 +31,8 @@ tokio = { version = "1.21.2", features = [
"rt-multi-thread",
] }
tokio-stream = "0.1.14"
packets = { path = "../packets" }
[target.'cfg(windows)'.dependencies]
windows = { version = "0.59.0", features = ["Win32_System_Threading"] }

View File

@@ -31,6 +31,7 @@ pub enum Error {
InvalidPcapFd,
Io(std::io::Error),
Libc(Errno),
IncorrectDeviceState(crate::State, crate::State),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -61,6 +62,10 @@ impl Display for Error {
Error::InvalidPcapFd => write!(f, "internal pcap file descriptor error"),
Error::Io(io) => write!(f, "std::io error ({io})"),
Error::Libc(err) => write!(f, "libc error ({err})"),
Error::IncorrectDeviceState(des, cur) => write!(
f,
"device in incorrect state (desired: {des:?}; current: {cur:?})"
),
}
}
}

View File

@@ -181,3 +181,8 @@ extern "C" {
pkt_data: *mut *mut c_char,
) -> c_int;
}
#[cfg(target_os = "windows")]
extern "C" {
pub fn pcap_getevent(p: *mut PcapDev) -> windows::Win32::Foundation::HANDLE;
}

View File

@@ -16,13 +16,12 @@
use std::{
ffi::{CStr, CString},
ptr, slice,
time::Duration,
};
pub mod error;
mod ffi;
pub use packets;
#[cfg(target_os = "linux")]
pub mod stream;
pub mod consts {
pub use super::ffi::{
@@ -102,37 +101,33 @@ impl std::iter::Iterator for PcapDevIterator {
}
}
pub trait State {}
pub trait Activated: State {}
pub trait NotListening: Activated {}
pub trait Listening: Activated {}
pub trait Disabled: State {}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub enum State {
Disabled,
Activated,
Listening,
}
pub enum DevActivated {}
impl State for DevActivated {}
impl Activated for DevActivated {}
impl NotListening for DevActivated {}
pub enum DevDisabled {}
impl State for DevDisabled {}
impl Disabled for DevDisabled {}
pub enum DevListening {}
impl State for DevListening {}
impl Activated for DevListening {}
impl Listening for DevListening {}
impl State {
fn is_activated(&self) -> bool {
match self {
Self::Disabled => false,
Self::Activated | Self::Listening => true,
}
}
}
pub struct BpfProgram {}
pub struct Interface<T: State> {
pub struct Interface {
dev_name: CString,
dev: *mut ffi::PcapDev,
marker: std::marker::PhantomData<T>,
absorbed: bool,
nonblocking: bool,
state: State,
}
impl<T: State> Drop for Interface<T> {
impl Drop for Interface {
fn drop(&mut self) {
if !self.absorbed {
unsafe { ffi::pcap_close(self.dev) };
@@ -140,11 +135,21 @@ impl<T: State> Drop for Interface<T> {
}
}
unsafe impl<T: State> Send for Interface<T> {}
unsafe impl<T: State> Sync for Interface<T> {}
unsafe impl Send for Interface {}
unsafe impl Sync for Interface {}
impl<T: State> Interface<T> {
pub fn new(name: &str) -> error::Result<Interface<DevDisabled>> {
struct ListenHandler<'a, F>
where
F: FnMut(&Interface, packets::EthernetPkt) -> error::Result<bool>,
{
packet_handler: F,
break_on_fail: bool,
fail_error: Option<error::Error>,
interface: &'a Interface,
}
impl Interface {
pub fn new(name: &str) -> error::Result<Interface> {
let mut errbuf = [0i8; ffi::PCAP_ERRBUF_SIZE];
let dev_name = CString::new(name)?;
@@ -154,12 +159,12 @@ impl<T: State> Interface<T> {
Err(&errbuf)?;
}
Ok(Interface::<DevDisabled> {
Ok(Interface {
dev_name,
dev,
marker: std::marker::PhantomData,
absorbed: false,
nonblocking: false,
state: State::Disabled,
})
}
@@ -200,10 +205,15 @@ impl<T: State> Interface<T> {
pub fn name(&self) -> &str {
std::str::from_utf8(self.dev_name.as_bytes()).unwrap()
}
}
impl<T: Disabled> Interface<T> {
pub fn set_promisc(&mut self, promisc: bool) -> error::Result<()> {
if self.state != State::Disabled {
return Err(error::Error::IncorrectDeviceState(
State::Disabled,
self.state,
));
}
if unsafe { ffi::pcap_set_promisc(self.dev, i32::from(promisc)) } != 0 {
Err(unsafe { ffi::pcap_geterr(self.dev) })?;
}
@@ -212,6 +222,13 @@ impl<T: Disabled> Interface<T> {
}
pub fn set_buffer_size(&mut self, bufsize: i32) -> error::Result<()> {
if self.state != State::Disabled {
return Err(error::Error::IncorrectDeviceState(
State::Disabled,
self.state,
));
}
if unsafe { ffi::pcap_set_buffer_size(self.dev, bufsize) } != 0 {
Err(unsafe { ffi::pcap_geterr(self.dev) })?;
}
@@ -220,6 +237,13 @@ impl<T: Disabled> Interface<T> {
}
pub fn set_timeout(&mut self, timeout: i32) -> error::Result<()> {
if self.state != State::Disabled {
return Err(error::Error::IncorrectDeviceState(
State::Disabled,
self.state,
));
}
if unsafe { ffi::pcap_set_timeout(self.dev, timeout) } != 0 {
Err(unsafe { ffi::pcap_geterr(self.dev) })?;
}
@@ -227,26 +251,33 @@ impl<T: Disabled> Interface<T> {
Ok(())
}
pub fn activate(mut self) -> error::Result<Interface<DevActivated>> {
pub fn activate(&mut self) -> error::Result<()> {
if self.state != State::Disabled {
return Err(error::Error::IncorrectDeviceState(
State::Disabled,
self.state,
));
}
if unsafe { ffi::pcap_activate(self.dev) } != 0 {
Err(unsafe { ffi::pcap_geterr(self.dev) })?;
}
self.absorbed = true;
self.state = State::Activated;
Ok(Interface::<DevActivated> {
dev_name: self.dev_name.clone(),
dev: self.dev,
marker: std::marker::PhantomData,
absorbed: false,
nonblocking: self.nonblocking,
})
Ok(())
}
}
impl<T: Activated> Interface<T> {
pub fn datalink(&self) -> i32 {
unsafe { ffi::pcap_datalink(self.dev) }
pub fn datalink(&self) -> error::Result<i32> {
if !self.state.is_activated() {
return Err(error::Error::IncorrectDeviceState(
State::Activated,
self.state,
));
}
Ok(unsafe { ffi::pcap_datalink(self.dev) })
}
pub fn set_filter(
@@ -255,6 +286,13 @@ impl<T: Activated> Interface<T> {
optimize: bool,
mask: Option<u32>,
) -> error::Result<Box<ffi::BpfProgram>> {
if !self.state.is_activated() {
return Err(error::Error::IncorrectDeviceState(
State::Activated,
self.state,
));
}
let mut bpf = ffi::BpfProgram {
bf_len: 0,
bpf_insn: ptr::null(),
@@ -290,6 +328,13 @@ impl<T: Activated> Interface<T> {
}
pub fn sendpacket(&self, packet: packets::EthernetPkt) -> error::Result<()> {
if !self.state.is_activated() {
return Err(error::Error::IncorrectDeviceState(
State::Activated,
self.state,
));
}
if unsafe {
ffi::pcap_sendpacket(
self.dev,
@@ -304,12 +349,37 @@ impl<T: Activated> Interface<T> {
Ok(())
}
pub fn next_packet(&mut self) -> error::Result<packets::EthernetPacket> {
pub fn next_packet(&self) -> error::Result<packets::EthernetPacket> {
if !self.state.is_activated() {
return Err(error::Error::IncorrectDeviceState(
State::Activated,
self.state,
));
}
let mut header: *mut ffi::PktHeader = ptr::null_mut();
let mut data: *mut libc::c_char = ptr::null_mut();
if unsafe { ffi::pcap_next_ex(self.dev, &mut header as *mut _, &mut data as *mut _) < 1 } {
return unsafe { Err(ffi::pcap_geterr(self.dev))? };
let res =
unsafe { ffi::pcap_next_ex(self.dev, &mut header as *mut _, &mut data as *mut _) };
match res {
1 => {} // no problems
0 => {
// timeout
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"pcap timeout",
))
.map_err(Into::into);
}
-1 => {
// actual error
return unsafe { Err(ffi::pcap_geterr(self.dev)).map_err(Into::into) };
}
_ => {
panic!("Unrecognized value returned from pcap_next_ex");
}
}
let rdata = unsafe { slice::from_raw_parts(data as *mut u8, (*header).caplen as usize) };
@@ -319,34 +389,29 @@ impl<T: Activated> Interface<T> {
Ok(packets::EthernetPkt { data: rdata }.to_owned())
}
}
struct ListenHandler<'a, F>
where
F: FnMut(&Interface<DevListening>, packets::EthernetPkt) -> error::Result<bool>,
{
packet_handler: F,
break_on_fail: bool,
fail_error: Option<error::Error>,
interface: &'a Interface<DevListening>,
}
impl<T: NotListening> Interface<T> {
pub fn listen<F>(
&self,
packet_handler: F,
break_on_fail: bool,
packet_count: i32,
) -> (Option<error::Error>, i32)
) -> error::Result<(Option<error::Error>, i32)>
where
F: FnMut(&Interface<DevListening>, packets::EthernetPkt) -> error::Result<bool>,
F: FnMut(&Interface, packets::EthernetPkt) -> error::Result<bool>,
{
if self.state == State::Listening {
return Err(error::Error::IncorrectDeviceState(
State::Activated,
self.state,
));
}
unsafe extern "C" fn cback<F>(
user: *mut libc::c_void,
header: *const ffi::PktHeader,
data: *const u8,
) where
F: FnMut(&Interface<DevListening>, packets::EthernetPkt) -> error::Result<bool>,
F: FnMut(&Interface, packets::EthernetPkt) -> error::Result<bool>,
{
let info = &mut *(user as *mut ListenHandler<F>);
@@ -380,9 +445,9 @@ impl<T: NotListening> Interface<T> {
let interface = Interface {
dev_name: self.dev_name.clone(),
dev: self.dev,
marker: std::marker::PhantomData,
absorbed: true,
nonblocking: self.nonblocking,
state: State::Listening,
};
let mut info = ListenHandler::<F> {
@@ -401,20 +466,102 @@ impl<T: NotListening> Interface<T> {
)
};
(info.fail_error, count)
Ok((info.fail_error, count))
}
#[cfg(target_os = "linux")]
pub fn stream(mut self) -> error::Result<stream::InterfaceStream<DevActivated>> {
self.set_non_blocking(true)?;
#[cfg(target_os = "windows")]
pub fn get_wait_ready_callback(&self) -> WaitHandle {
let handle = unsafe { ffi::pcap_getevent(self.dev) };
WaitHandle(handle)
}
Ok(stream::InterfaceStream {
inner: tokio::io::unix::AsyncFd::with_interest(
stream::InternalInterfaceStream::<DevActivated>::new(unsafe {
std::mem::transmute(self)
})?,
tokio::io::Interest::READABLE,
)?,
})
#[cfg(not(target_os = "windows"))]
pub fn get_wait_ready_callback(&self) -> WaitHandle {
let fd = unsafe { ffi::pcap_get_selectable_fd(self.dev) };
WaitHandle(fd)
}
pub fn wait_ready(&self, timeout: Option<Duration>) -> error::Result<()> {
self.get_wait_ready_callback().wait(timeout)
}
}
#[cfg(windows)]
#[derive(Clone)]
pub struct WaitHandle(windows::Win32::Foundation::HANDLE);
#[cfg(unix)]
#[derive(Clone)]
pub struct WaitHandle(libc::c_int);
unsafe impl Send for WaitHandle {}
unsafe impl Sync for WaitHandle {}
impl WaitHandle {
#[cfg(windows)]
pub fn wait(&self, timeout: Option<Duration>) -> error::Result<()> {
use windows::Win32::System::Threading::{WaitForSingleObject, INFINITE};
let timeout = timeout
.map(|t| (t.as_millis() & 0xFFFFFFFF) as u32)
.unwrap_or(50);
unsafe {
if WaitForSingleObject(self.0, timeout).0 != 0 {
Err(std::io::Error::last_os_error()).map_err(Into::into)
} else {
Ok(())
}
}
}
#[cfg(unix)]
pub fn wait(&self, timeout: Option<Duration>) -> error::Result<()> {
unsafe {
use std::mem::MaybeUninit;
let mut readfds = {
let mut readfds = MaybeUninit::<libc::fd_set>::uninit();
libc::FD_ZERO(readfds.as_mut_ptr());
libc::FD_SET(self.0, readfds.as_mut_ptr());
readfds.assume_init()
};
let mut writefds = {
let mut writefds = MaybeUninit::<libc::fd_set>::uninit();
libc::FD_ZERO(writefds.as_mut_ptr());
libc::FD_SET(self.0, writefds.as_mut_ptr());
writefds.assume_init()
};
let mut exceptfds = {
let mut exceptfds = MaybeUninit::<libc::fd_set>::uninit();
libc::FD_ZERO(exceptfds.as_mut_ptr());
exceptfds.assume_init()
};
let mut c_timeout = libc::timeval {
tv_sec: 0,
tv_usec: 50_000,
};
if let Some(t) = timeout {
c_timeout.tv_sec = t.as_secs() as libc::time_t;
c_timeout.tv_usec = (t.as_micros() % 1_000_000) as libc::suseconds_t;
}
let res = libc::select(
1,
&mut readfds,
&mut writefds,
&mut exceptfds,
&mut c_timeout as *mut _,
);
if res == -1 {
Err(std::io::Error::last_os_error()).map_err(Into::into)
} else {
Ok(())
}
}
}
}

View File

@@ -1,343 +0,0 @@
// Copyright (C) 2023 Andrew Rioux
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
os::fd::{AsRawFd, RawFd},
pin::Pin,
task::{self, Poll},
};
use futures::{ready, StreamExt};
use tokio::io::unix::AsyncFd;
use tokio_stream::StreamMap;
use super::{
error, ffi, packets, Activated, DevActivated, DevDisabled, Disabled, Interface, NotListening,
PcapDevIterator, State,
};
pub(crate) struct InternalInterfaceStream<T: Activated> {
interface: Interface<T>,
fd: RawFd,
}
impl<T: Activated> InternalInterfaceStream<T> {
pub(crate) fn new(interface: Interface<T>) -> error::Result<InternalInterfaceStream<T>> {
let fd = unsafe { ffi::pcap_get_selectable_fd(interface.dev) };
if fd == -1 {
return Err(error::Error::InvalidPcapFd);
}
Ok(Self { interface, fd })
}
}
impl<T: Activated> AsRawFd for InternalInterfaceStream<T> {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
pub struct InterfaceStream<T: Activated> {
pub(crate) inner: AsyncFd<InternalInterfaceStream<T>>,
}
impl<T: Activated> InterfaceStream<T> {
pub fn sendpacket(&mut self, packet: packets::EthernetPkt) -> error::Result<()> {
self.inner.get_mut().interface.sendpacket(packet)
}
pub fn set_filter(
&mut self,
filter: &str,
optimize: bool,
mask: Option<u32>,
) -> error::Result<Box<ffi::BpfProgram>> {
self.inner
.get_mut()
.interface
.set_filter(filter, optimize, mask)
}
}
impl<T: Activated> Unpin for InterfaceStream<T> {}
impl<T: Activated> futures::Stream for InterfaceStream<T> {
type Item = error::Result<packets::EthernetPacket>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let stream = Pin::into_inner(self);
loop {
let mut guard = ready!(stream.inner.poll_read_ready_mut(cx))?;
match guard.try_io(|inner| match inner.get_mut().interface.next_packet() {
Ok(p) => Ok(Ok(p)),
Err(e) => Ok(Err(e)),
}) {
Ok(result) => {
return Poll::Ready(Some(result?));
}
Err(_would_block) => continue,
}
}
}
}
pub fn new_aggregate_interface_filtered<F>(
crash: bool,
mut f: F,
) -> error::Result<AggregateInterface<DevDisabled>>
where
F: FnMut(&str) -> bool,
{
let interfaces = if crash {
PcapDevIterator::new()?
.filter(|s| (f)(s))
.map(|if_name| {
let new_name = if_name.clone();
Interface::<DevDisabled>::new(&if_name)
.map(|interface| (if_name, interface))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<HashMap<_, _>>>()?
} else {
PcapDevIterator::new()?
.filter(|s| (f)(s))
.filter_map(|if_name| {
let new_name = if_name.clone();
Interface::<DevDisabled>::new(&if_name)
.map(|interface| (if_name, interface))
.ok()
.or_else(|| {
println!("{} failed to create device", new_name);
None
})
})
.collect::<HashMap<_, _>>()
};
Ok(AggregateInterface { interfaces, crash })
}
pub fn new_aggregate_interface(crash: bool) -> error::Result<AggregateInterface<DevDisabled>> {
new_aggregate_interface_filtered(crash, |_| true)
}
pub struct AggregateInterface<T: State> {
interfaces: HashMap<String, Interface<T>>,
crash: bool,
}
impl<T: State> AggregateInterface<T> {
pub fn set_non_blocking(&mut self, nonblocking: bool) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_non_blocking(nonblocking)
.map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn lookupnets(&self) -> error::Result<HashMap<&str, (u32, u32)>> {
self.interfaces
.iter()
.map(|(name, interface)| {
interface
.lookupnet()
.map(|net| (&**name, net))
.map_err(|e| e.add_ifname(&name))
})
.collect::<error::Result<_>>()
}
pub fn get_ifnames(&self) -> Vec<&str> {
self.interfaces.keys().map(|n| &**n).collect::<_>()
}
}
impl<T: Disabled> AggregateInterface<T> {
pub fn set_promisc(&mut self, promisc: bool) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_promisc(promisc).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn set_buffer_size(&mut self, bufsize: i32) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_buffer_size(bufsize).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn set_timeout(&mut self, timeout: i32) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_timeout(timeout).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn activate(self) -> error::Result<AggregateInterface<DevActivated>> {
Ok(AggregateInterface {
interfaces: if self.crash {
self.interfaces
.into_iter()
.map(|(name, interface)| {
let new_name = name.clone();
interface
.activate()
.map(|interface| (name, interface))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<_>>()?
} else {
self.interfaces
.into_iter()
.filter_map(|(name, interface)| {
let name_clone = name.clone();
interface
.activate()
.map(|interface| (name, interface))
.ok()
.or_else(|| {
println!("{} failed to activate", name_clone);
None
})
})
.collect::<_>()
},
crash: self.crash,
})
}
}
impl<T: Activated> AggregateInterface<T> {
pub fn datalinks(&self) -> HashMap<&str, i32> {
self.interfaces
.iter()
.map(|(name, interface)| (&**name, interface.datalink()))
.collect::<_>()
}
pub fn prune<F>(&mut self, mut f: F)
where
F: FnMut(&str, &mut Interface<T>) -> bool,
{
let to_prune = self
.interfaces
.iter_mut()
.filter_map(|(k, v)| if (f)(k, v) { Some(k.clone()) } else { None })
.collect::<Vec<_>>();
for name in to_prune {
self.interfaces.remove(&name);
}
}
pub fn set_filter(
&mut self,
filter: &str,
optimize: bool,
mask: Option<u32>,
) -> error::Result<HashMap<&str, Box<ffi::BpfProgram>>> {
if self.crash {
self.interfaces
.iter_mut()
.map(|(name, interface)| {
interface
.set_filter(filter, optimize, mask)
.map(|bpf| (&**name, bpf))
.map_err(|e| e.add_ifname(&name))
})
.collect::<error::Result<_>>()
} else {
Ok(self
.interfaces
.iter_mut()
.filter_map(|(name, interface)| {
let name_clone = name.clone();
interface
.set_filter(filter, optimize, mask)
.map(|bpf| (&**name, bpf))
.ok()
.or_else(|| {
println!("{} failed to set filter", name_clone);
None
})
})
.collect::<_>())
}
}
pub fn sendpacket(&self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> {
if let Some(interface) = self.interfaces.get(ifname) {
interface
.sendpacket(packet)
.map_err(|e| e.add_ifname(ifname))?;
}
Ok(())
}
}
impl<T: NotListening> AggregateInterface<T> {
pub fn stream(self) -> error::Result<AggregateInterfaceStream<DevActivated>> {
Ok(AggregateInterfaceStream {
streams: self
.interfaces
.into_iter()
.map(|(ifname, interface)| {
let new_name = ifname.clone();
interface
.stream()
.map(|stream| (ifname, stream))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<_>>()?,
})
}
}
pub struct AggregateInterfaceStream<T: Activated> {
streams: StreamMap<String, InterfaceStream<T>>,
}
impl<T: Activated> AggregateInterfaceStream<T> {
pub fn get_ifnames(&self) -> Vec<&str> {
self.streams.keys().map(|n| &**n).collect::<_>()
}
pub fn sendpacket(&mut self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> {
if let Some(interface) = self.streams.values_mut().find(|interface| {
interface.inner.get_ref().interface.dev_name.as_bytes() == ifname.as_bytes()
}) {
interface.sendpacket(packet)?;
}
Ok(())
}
}
impl<T: Activated> futures::Stream for AggregateInterfaceStream<T> {
type Item = (String, error::Result<packets::EthernetPacket>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.streams.poll_next_unpin(cx)
}
}