feat: added AggregateInterface

AggregateInterface can be used to gather multiple libpcap interfaces
together in order to listen to all simultaneously and also selectively
send on different interfaces
This commit is contained in:
Andrew Rioux 2023-05-01 12:40:12 -04:00
parent cfdf8f7e86
commit 24dff10b6b
6 changed files with 246 additions and 22 deletions

1
Cargo.lock generated
View File

@ -427,6 +427,7 @@ dependencies = [
"futures", "futures",
"libc", "libc",
"tokio", "tokio",
"tokio-stream",
] ]
[[package]] [[package]]

View File

@ -1,6 +1,6 @@
use std::{net::Ipv4Addr, collections::HashMap}; use std::{net::Ipv4Addr, collections::HashMap};
use anyhow::{anyhow, bail}; use anyhow::anyhow;
use tokio::time::{Duration, interval}; use tokio::time::{Duration, interval};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
@ -25,7 +25,7 @@ async fn main() -> anyhow::Result<()> {
// let target = target.trim().parse::<Ipv4Addr>()?; // let target = target.trim().parse::<Ipv4Addr>()?;
let target: Ipv4Addr = "172.19.0.2".parse().unwrap(); let target: Ipv4Addr = "172.19.0.2".parse().unwrap();
let (src_mac, dst_mac, srcip) = { let (ifname, src_mac, dst_mac, srcip) = {
let socket = netlink::Socket::new()?; let socket = netlink::Socket::new()?;
let routes = socket.get_routes()?; let routes = socket.get_routes()?;
@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
println!("link {:?}: {:?}, {}", link.name(), link.addr(), link.ifindex()); println!("link {:?}: {:?}, {}", link.name(), link.addr(), link.ifindex());
} }
let (srcip, srcmac, dstmac) = route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, target) let (ifname, srcip, srcmac, dstmac) = route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, target)
.ok_or(anyhow!("unable to find a route to the IP"))?; .ok_or(anyhow!("unable to find a route to the IP"))?;
/*let srcip = route::get_srcip_for_dstip(&routes, target) /*let srcip = route::get_srcip_for_dstip(&routes, target)
@ -75,16 +75,10 @@ async fn main() -> anyhow::Result<()> {
dbg!(srcip); dbg!(srcip);
dbg!(target); dbg!(target);
( srcmac, dstmac, srcip ) ( ifname, srcmac, dstmac, srcip )
}; };
let mut interfaces = pcap_sys::PcapDevIterator::new()?; let mut interface = pcap_sys::new_aggregate_interface(false)?;
let interface_name = interfaces
.find(|eth| eth.starts_with("eth"))
.ok_or(anyhow!("could not get an ethernet interface"))?;
let mut interface = pcap_sys::Interface::<pcap_sys::DevDisabled>::new(&interface_name)?;
interface.set_buffer_size(8192)?; interface.set_buffer_size(8192)?;
interface.set_non_blocking(true)?; interface.set_non_blocking(true)?;
@ -95,12 +89,10 @@ async fn main() -> anyhow::Result<()> {
interface.set_filter("inbound and udp port 54248", true, None)?; interface.set_filter("inbound and udp port 54248", true, None)?;
if interface.datalink() != pcap_sys::consts::DLT_EN10MB { interface.prune(|_, interface| interface.datalink() == pcap_sys::consts::DLT_EN10MB);
bail!("interface does not support ethernet")
}
enum EventType { enum EventType {
Packet(Result<EthernetPacket, pcap_sys::error::Error>), Packet((String, Result<EthernetPacket, pcap_sys::error::Error>)),
Update Update
} }
@ -114,7 +106,7 @@ async fn main() -> anyhow::Result<()> {
_ = update_interval.tick() => Some(EventType::Update) _ = update_interval.tick() => Some(EventType::Update)
} { } {
match evt { match evt {
EventType::Packet(Ok(pkt)) => { EventType::Packet((_, Ok(pkt))) => {
let eth_pkt = pkt.pkt(); let eth_pkt = pkt.pkt();
let Ok(Layer3Pkt::IPv4Pkt(ip_pkt)) = eth_pkt.get_layer3_pkt() else { continue; }; let Ok(Layer3Pkt::IPv4Pkt(ip_pkt)) = eth_pkt.get_layer3_pkt() else { continue; };
let Ok(Layer4Pkt::UDP(udp_pkt)) = ip_pkt.get_layer4_packet() else { continue; }; let Ok(Layer4Pkt::UDP(udp_pkt)) = ip_pkt.get_layer4_packet() else { continue; };
@ -139,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
let ip_packet = IPv4Packet::construct(srcip, target, &Layer4Packet::UDP(udp_packet)); let ip_packet = IPv4Packet::construct(srcip, target, &Layer4Packet::UDP(udp_packet));
let eth_packet = EthernetPacket::construct(src_mac, dst_mac, &Layer3Packet::IPv4(ip_packet)); let eth_packet = EthernetPacket::construct(src_mac, dst_mac, &Layer3Packet::IPv4(ip_packet));
packets.sendpacket(eth_packet.pkt())?; packets.sendpacket(&ifname, eth_packet.pkt())?;
} }
_ => {} _ => {}
} }

View File

@ -129,7 +129,7 @@ impl From<*mut nl_object> for Link {
} }
} }
pub fn get_macs_and_src_for_ip(addrs: &Cache<RtAddr>, routes: &Cache<Route>, neighs: &Cache<Neigh>, links: &Cache<Link>, addr: Ipv4Addr) -> Option<(Ipv4Addr, [u8; 6], [u8; 6])> { pub fn get_macs_and_src_for_ip(addrs: &Cache<RtAddr>, routes: &Cache<Route>, neighs: &Cache<Neigh>, links: &Cache<Link>, addr: Ipv4Addr) -> Option<(String, Ipv4Addr, [u8; 6], [u8; 6])> {
let mut sorted_routes = routes.iter().collect::<Vec<_>>(); let mut sorted_routes = routes.iter().collect::<Vec<_>>();
sorted_routes.sort_by(|r1, r2| { sorted_routes.sort_by(|r1, r2| {
@ -173,6 +173,7 @@ pub fn get_macs_and_src_for_ip(addrs: &Cache<RtAddr>, routes: &Cache<Route>, nei
.find(|a| a.ifindex() == link.ifindex())?; .find(|a| a.ifindex() == link.ifindex())?;
Some(( Some((
link.name(),
(&srcip.local()?).try_into().ok()?, (&srcip.local()?).try_into().ok()?,
link.addr().hw_address().try_into().ok()?, link.addr().hw_address().try_into().ok()?,
neigh.lladdr().hw_address().try_into().ok()? neigh.lladdr().hw_address().try_into().ok()?
@ -316,6 +317,7 @@ impl Debug for Addr {
f f
.debug_struct("Addr") .debug_struct("Addr")
.field("addr", &self.hw_address()) .field("addr", &self.hw_address())
.field("atype", &self.atype())
.finish() .finish()
} }
} }

View File

@ -23,6 +23,7 @@ errno = "0.2.8"
futures = "0.3.25" futures = "0.3.25"
libc = "0.2.142" libc = "0.2.142"
tokio = { version = "1.21.2", features = ["net", "rt", "macros", "rt-multi-thread" ] } tokio = { version = "1.21.2", features = ["net", "rt", "macros", "rt-multi-thread" ] }
tokio-stream = "0.1.14"
[build-dependencies] [build-dependencies]
cmake = "0.1" cmake = "0.1"

View File

@ -22,6 +22,7 @@ use std::{
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
PcapError(CString), PcapError(CString),
PcapErrorIf(String, CString),
StringParse, StringParse,
UnknownPacketType(u16), UnknownPacketType(u16),
PacketLengthInvalid, PacketLengthInvalid,
@ -42,6 +43,13 @@ impl Display for Error {
write!(f, "unknown pcap error") write!(f, "unknown pcap error")
}, },
Error::PcapErrorIf(ifname, err) => {
if let Ok(err_str) = std::str::from_utf8(err.as_bytes()) {
return write!(f, "pcap error on interface {ifname}: {err_str}");
}
write!(f, "unknown pcap error with interface {ifname}")
},
Error::StringParse => write!(f, "unable to parse a string from pcap"), Error::StringParse => write!(f, "unable to parse a string from pcap"),
Error::UnknownPacketType(ptype) => write!(f, "unknown packet type ({ptype})"), Error::UnknownPacketType(ptype) => write!(f, "unknown packet type ({ptype})"),
Error::PacketLengthInvalid => write!(f, "received a packet with a length that mismatched the header"), Error::PacketLengthInvalid => write!(f, "received a packet with a length that mismatched the header"),
@ -52,6 +60,15 @@ impl Display for Error {
} }
} }
impl Error {
pub fn add_ifname(self, ifname: &str) -> Self {
match self {
Error::PcapError(err) => Error::PcapErrorIf(ifname.to_string(), err),
other => other
}
}
}
impl error::Error for Error { impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> { fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self { match self {

View File

@ -18,7 +18,7 @@ use std::{
os::fd::{AsRawFd, RawFd}, os::fd::{AsRawFd, RawFd},
pin::Pin, pin::Pin,
ptr, slice, ptr, slice,
task::{self, Poll}, task::{self, Poll}, collections::HashMap,
}; };
pub mod error; pub mod error;
@ -50,8 +50,9 @@ pub mod consts {
} }
use ffi::PcapDevIf; use ffi::PcapDevIf;
use futures::ready; use futures::{ready, Stream, StreamExt};
use tokio::io::unix::AsyncFd; use tokio::io::unix::AsyncFd;
use tokio_stream::StreamMap;
pub struct PcapDevIterator { pub struct PcapDevIterator {
dev_if: *const PcapDevIf, dev_if: *const PcapDevIf,
@ -288,8 +289,6 @@ impl<T: Activated> Interface<T> {
} }
pub fn sendpacket(&self, packet: packets::EthernetPkt) -> error::Result<()> { pub fn sendpacket(&self, packet: packets::EthernetPkt) -> error::Result<()> {
dbg!(packet.data.len());
if unsafe { if unsafe {
ffi::pcap_sendpacket( ffi::pcap_sendpacket(
self.dev, self.dev,
@ -470,3 +469,215 @@ impl<T: Activated> futures::Stream for InterfaceStream<T> {
} }
} }
} }
pub fn new_aggregate_interface_filtered<F>(crash: bool, mut f: F) -> error::Result<AggregateInterface<DevDisabled>>
where
F: FnMut(&str) -> bool
{
let interfaces = if crash {
PcapDevIterator::new()?
.filter(|s| (f)(s))
.map(|if_name| {
let new_name = if_name.clone();
Interface::<DevDisabled>::new(&if_name)
.map(|interface| (if_name, interface))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<HashMap<_, _>>>()?
} else {
PcapDevIterator::new()?
.filter(|s| (f)(s))
.filter_map(|if_name| {
let new_name = if_name.clone();
Interface::<DevDisabled>::new(&if_name)
.map(|interface| (if_name, interface))
.ok()
})
.collect::<HashMap<_, _>>()
};
Ok(AggregateInterface {
interfaces,
crash
})
}
pub fn new_aggregate_interface(crash: bool) -> error::Result<AggregateInterface<DevDisabled>> {
new_aggregate_interface_filtered(crash, |_| true)
}
pub struct AggregateInterface<T: State> {
interfaces: HashMap<String, Interface<T>>,
crash: bool
}
impl<T: State> AggregateInterface<T> {
pub fn set_non_blocking(&mut self, nonblocking: bool) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i
.set_non_blocking(nonblocking)
.map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn lookupnets(&self) -> error::Result<HashMap<&str, (u32, u32)>> {
self.interfaces
.iter()
.map(|(name, interface)| {
interface
.lookupnet()
.map(|net| (&**name, net))
.map_err(|e| e.add_ifname(&name))
})
.collect::<error::Result<_>>()
}
}
impl<T: Disabled> AggregateInterface<T> {
pub fn set_promisc(&mut self, promisc: bool) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_promisc(promisc).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn set_buffer_size(&mut self, bufsize: i32) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_buffer_size(bufsize).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn set_timeout(&mut self, timeout: i32) -> error::Result<()> {
for (n, i) in self.interfaces.iter_mut() {
i.set_timeout(timeout).map_err(|e| e.add_ifname(n))?;
}
Ok(())
}
pub fn activate(self) -> error::Result<AggregateInterface<DevActivated>> {
Ok(AggregateInterface {
interfaces: self.interfaces
.into_iter()
.map(|(name, interface)| {
let new_name = name.clone();
interface
.activate()
.map(|interface| (name, interface))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<_>>()?
})
}
}
impl<T: Activated> AggregateInterface<T> {
pub fn datalinks(&self) -> HashMap<&str, i32> {
self.interfaces
.iter()
.map(|(name, interface)| {
(&**name, interface.datalink())
})
.collect::<_>()
}
pub fn prune<F>(&mut self, mut f: F)
where
F: FnMut(&str, &mut Interface<T>) -> bool
{
let to_prune = self.interfaces
.iter_mut()
.filter_map(|(k,v)| {
if (f)(k, v) {
Some(k.clone())
} else {
None
}
})
.collect::<Vec<_>>();
for name in to_prune {
self.interfaces.remove(&name);
}
}
pub fn set_filter(
&mut self,
filter: &str,
optimize: bool,
mask: Option<u32>
) -> error::Result<HashMap<&str, Box<ffi::BpfProgram>>> {
self.interfaces
.iter_mut()
.map(|(name, interface)| {
interface.set_filter(filter, optimize, mask)
.map(|bpf| (&**name, bpf))
.map_err(|e| e.add_ifname(&name))
})
.collect::<error::Result<_>>()
}
pub fn sendpacket(&self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> {
if let Some(interface) = self.interfaces.get(ifname) {
interface.sendpacket(packet).map_err(|e| e.add_ifname(ifname))?;
}
Ok(())
}
}
impl<T: NotListening> AggregateInterface<T> {
pub fn stream(self) -> error::Result<AggregateInterfaceStream<DevActivated>> {
Ok(AggregateInterfaceStream {
streams: self.interfaces
.into_iter()
.map(|(ifname, interface)| {
let new_name = ifname.clone();
interface
.stream()
.map(|stream| (ifname, stream))
.map_err(|e| e.add_ifname(&new_name))
})
.collect::<error::Result<_>>()?
})
}
}
pub struct AggregateInterfaceStream<T: Activated> {
streams: StreamMap<String, InterfaceStream<T>>
}
impl<T: Activated> AggregateInterfaceStream<T> {
pub fn sendpacket(&mut self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> {
if let Some(interface) = self.streams
.values_mut()
.find(|interface| {
let dev_name = interface.inner
.get_ref()
.interface
.dev_name
.clone();
CString::new(ifname).map(|ifname|
ifname == dev_name)
.unwrap_or(false)
}) {
interface.sendpacket(packet)?;
}
Ok(())
}
}
impl<T: Activated> futures::Stream for AggregateInterfaceStream<T> {
type Item = (String, error::Result<packets::EthernetPacket>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.streams.poll_next_unpin(cx)
}
}