From 24dff10b6b7621364dfe08bc7f05760e62fe22fb Mon Sep 17 00:00:00 2001 From: Andrew Rioux Date: Mon, 1 May 2023 12:40:12 -0400 Subject: [PATCH] feat: added AggregateInterface AggregateInterface can be used to gather multiple libpcap interfaces together in order to listen to all simultaneously and also selectively send on different interfaces --- Cargo.lock | 1 + examples/reverse-shell/beacon/src/main.rs | 26 +-- nl-sys/src/route.rs | 4 +- pcap-sys/Cargo.toml | 1 + pcap-sys/src/error.rs | 17 ++ pcap-sys/src/lib.rs | 219 +++++++++++++++++++++- 6 files changed, 246 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 703c123..1e51758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,6 +427,7 @@ dependencies = [ "futures", "libc", "tokio", + "tokio-stream", ] [[package]] diff --git a/examples/reverse-shell/beacon/src/main.rs b/examples/reverse-shell/beacon/src/main.rs index 3975135..00ce35c 100644 --- a/examples/reverse-shell/beacon/src/main.rs +++ b/examples/reverse-shell/beacon/src/main.rs @@ -1,6 +1,6 @@ use std::{net::Ipv4Addr, collections::HashMap}; -use anyhow::{anyhow, bail}; +use anyhow::anyhow; use tokio::time::{Duration, interval}; use tokio_stream::StreamExt; @@ -25,7 +25,7 @@ async fn main() -> anyhow::Result<()> { // let target = target.trim().parse::()?; let target: Ipv4Addr = "172.19.0.2".parse().unwrap(); - let (src_mac, dst_mac, srcip) = { + let (ifname, src_mac, dst_mac, srcip) = { let socket = netlink::Socket::new()?; let routes = socket.get_routes()?; @@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> { println!("link {:?}: {:?}, {}", link.name(), link.addr(), link.ifindex()); } - let (srcip, srcmac, dstmac) = route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, target) + let (ifname, srcip, srcmac, dstmac) = route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, target) .ok_or(anyhow!("unable to find a route to the IP"))?; /*let srcip = route::get_srcip_for_dstip(&routes, target) @@ -75,16 +75,10 @@ async fn main() -> anyhow::Result<()> { dbg!(srcip); dbg!(target); - ( srcmac, dstmac, srcip ) + ( ifname, srcmac, dstmac, srcip ) }; - let mut interfaces = pcap_sys::PcapDevIterator::new()?; - - let interface_name = interfaces - .find(|eth| eth.starts_with("eth")) - .ok_or(anyhow!("could not get an ethernet interface"))?; - - let mut interface = pcap_sys::Interface::::new(&interface_name)?; + let mut interface = pcap_sys::new_aggregate_interface(false)?; interface.set_buffer_size(8192)?; interface.set_non_blocking(true)?; @@ -95,12 +89,10 @@ async fn main() -> anyhow::Result<()> { interface.set_filter("inbound and udp port 54248", true, None)?; - if interface.datalink() != pcap_sys::consts::DLT_EN10MB { - bail!("interface does not support ethernet") - } + interface.prune(|_, interface| interface.datalink() == pcap_sys::consts::DLT_EN10MB); enum EventType { - Packet(Result), + Packet((String, Result)), Update } @@ -114,7 +106,7 @@ async fn main() -> anyhow::Result<()> { _ = update_interval.tick() => Some(EventType::Update) } { match evt { - EventType::Packet(Ok(pkt)) => { + EventType::Packet((_, Ok(pkt))) => { let eth_pkt = pkt.pkt(); let Ok(Layer3Pkt::IPv4Pkt(ip_pkt)) = eth_pkt.get_layer3_pkt() else { continue; }; let Ok(Layer4Pkt::UDP(udp_pkt)) = ip_pkt.get_layer4_packet() else { continue; }; @@ -139,7 +131,7 @@ async fn main() -> anyhow::Result<()> { let ip_packet = IPv4Packet::construct(srcip, target, &Layer4Packet::UDP(udp_packet)); let eth_packet = EthernetPacket::construct(src_mac, dst_mac, &Layer3Packet::IPv4(ip_packet)); - packets.sendpacket(eth_packet.pkt())?; + packets.sendpacket(&ifname, eth_packet.pkt())?; } _ => {} } diff --git a/nl-sys/src/route.rs b/nl-sys/src/route.rs index 69b00e1..3593a6f 100644 --- a/nl-sys/src/route.rs +++ b/nl-sys/src/route.rs @@ -129,7 +129,7 @@ impl From<*mut nl_object> for Link { } } -pub fn get_macs_and_src_for_ip(addrs: &Cache, routes: &Cache, neighs: &Cache, links: &Cache, addr: Ipv4Addr) -> Option<(Ipv4Addr, [u8; 6], [u8; 6])> { +pub fn get_macs_and_src_for_ip(addrs: &Cache, routes: &Cache, neighs: &Cache, links: &Cache, addr: Ipv4Addr) -> Option<(String, Ipv4Addr, [u8; 6], [u8; 6])> { let mut sorted_routes = routes.iter().collect::>(); sorted_routes.sort_by(|r1, r2| { @@ -173,6 +173,7 @@ pub fn get_macs_and_src_for_ip(addrs: &Cache, routes: &Cache, nei .find(|a| a.ifindex() == link.ifindex())?; Some(( + link.name(), (&srcip.local()?).try_into().ok()?, link.addr().hw_address().try_into().ok()?, neigh.lladdr().hw_address().try_into().ok()? @@ -316,6 +317,7 @@ impl Debug for Addr { f .debug_struct("Addr") .field("addr", &self.hw_address()) + .field("atype", &self.atype()) .finish() } } diff --git a/pcap-sys/Cargo.toml b/pcap-sys/Cargo.toml index 7171a73..488d487 100644 --- a/pcap-sys/Cargo.toml +++ b/pcap-sys/Cargo.toml @@ -23,6 +23,7 @@ errno = "0.2.8" futures = "0.3.25" libc = "0.2.142" tokio = { version = "1.21.2", features = ["net", "rt", "macros", "rt-multi-thread" ] } +tokio-stream = "0.1.14" [build-dependencies] cmake = "0.1" \ No newline at end of file diff --git a/pcap-sys/src/error.rs b/pcap-sys/src/error.rs index b819fbc..e0a21fb 100644 --- a/pcap-sys/src/error.rs +++ b/pcap-sys/src/error.rs @@ -22,6 +22,7 @@ use std::{ #[derive(Debug)] pub enum Error { PcapError(CString), + PcapErrorIf(String, CString), StringParse, UnknownPacketType(u16), PacketLengthInvalid, @@ -42,6 +43,13 @@ impl Display for Error { write!(f, "unknown pcap error") }, + Error::PcapErrorIf(ifname, err) => { + if let Ok(err_str) = std::str::from_utf8(err.as_bytes()) { + return write!(f, "pcap error on interface {ifname}: {err_str}"); + } + + write!(f, "unknown pcap error with interface {ifname}") + }, Error::StringParse => write!(f, "unable to parse a string from pcap"), Error::UnknownPacketType(ptype) => write!(f, "unknown packet type ({ptype})"), Error::PacketLengthInvalid => write!(f, "received a packet with a length that mismatched the header"), @@ -52,6 +60,15 @@ impl Display for Error { } } +impl Error { + pub fn add_ifname(self, ifname: &str) -> Self { + match self { + Error::PcapError(err) => Error::PcapErrorIf(ifname.to_string(), err), + other => other + } + } +} + impl error::Error for Error { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { diff --git a/pcap-sys/src/lib.rs b/pcap-sys/src/lib.rs index f4ac3d9..77dbf87 100644 --- a/pcap-sys/src/lib.rs +++ b/pcap-sys/src/lib.rs @@ -18,7 +18,7 @@ use std::{ os::fd::{AsRawFd, RawFd}, pin::Pin, ptr, slice, - task::{self, Poll}, + task::{self, Poll}, collections::HashMap, }; pub mod error; @@ -50,8 +50,9 @@ pub mod consts { } use ffi::PcapDevIf; -use futures::ready; +use futures::{ready, Stream, StreamExt}; use tokio::io::unix::AsyncFd; +use tokio_stream::StreamMap; pub struct PcapDevIterator { dev_if: *const PcapDevIf, @@ -288,8 +289,6 @@ impl Interface { } pub fn sendpacket(&self, packet: packets::EthernetPkt) -> error::Result<()> { - dbg!(packet.data.len()); - if unsafe { ffi::pcap_sendpacket( self.dev, @@ -470,3 +469,215 @@ impl futures::Stream for InterfaceStream { } } } + +pub fn new_aggregate_interface_filtered(crash: bool, mut f: F) -> error::Result> +where + F: FnMut(&str) -> bool +{ + let interfaces = if crash { + PcapDevIterator::new()? + .filter(|s| (f)(s)) + .map(|if_name| { + let new_name = if_name.clone(); + Interface::::new(&if_name) + .map(|interface| (if_name, interface)) + .map_err(|e| e.add_ifname(&new_name)) + }) + .collect::>>()? + } else { + PcapDevIterator::new()? + .filter(|s| (f)(s)) + .filter_map(|if_name| { + let new_name = if_name.clone(); + Interface::::new(&if_name) + .map(|interface| (if_name, interface)) + .ok() + }) + .collect::>() + }; + + Ok(AggregateInterface { + interfaces, + crash + }) +} + +pub fn new_aggregate_interface(crash: bool) -> error::Result> { + new_aggregate_interface_filtered(crash, |_| true) +} + +pub struct AggregateInterface { + interfaces: HashMap>, + crash: bool +} + +impl AggregateInterface { + pub fn set_non_blocking(&mut self, nonblocking: bool) -> error::Result<()> { + for (n, i) in self.interfaces.iter_mut() { + i + .set_non_blocking(nonblocking) + .map_err(|e| e.add_ifname(n))?; + } + + Ok(()) + } + + pub fn lookupnets(&self) -> error::Result> { + self.interfaces + .iter() + .map(|(name, interface)| { + interface + .lookupnet() + .map(|net| (&**name, net)) + .map_err(|e| e.add_ifname(&name)) + }) + .collect::>() + } +} + +impl AggregateInterface { + pub fn set_promisc(&mut self, promisc: bool) -> error::Result<()> { + for (n, i) in self.interfaces.iter_mut() { + i.set_promisc(promisc).map_err(|e| e.add_ifname(n))?; + } + + Ok(()) + } + + pub fn set_buffer_size(&mut self, bufsize: i32) -> error::Result<()> { + for (n, i) in self.interfaces.iter_mut() { + i.set_buffer_size(bufsize).map_err(|e| e.add_ifname(n))?; + } + + Ok(()) + } + + pub fn set_timeout(&mut self, timeout: i32) -> error::Result<()> { + for (n, i) in self.interfaces.iter_mut() { + i.set_timeout(timeout).map_err(|e| e.add_ifname(n))?; + } + + Ok(()) + } + + pub fn activate(self) -> error::Result> { + Ok(AggregateInterface { + interfaces: self.interfaces + .into_iter() + .map(|(name, interface)| { + let new_name = name.clone(); + interface + .activate() + .map(|interface| (name, interface)) + .map_err(|e| e.add_ifname(&new_name)) + }) + .collect::>()? + }) + } +} + +impl AggregateInterface { + pub fn datalinks(&self) -> HashMap<&str, i32> { + self.interfaces + .iter() + .map(|(name, interface)| { + (&**name, interface.datalink()) + }) + .collect::<_>() + } + + pub fn prune(&mut self, mut f: F) + where + F: FnMut(&str, &mut Interface) -> bool + { + let to_prune = self.interfaces + .iter_mut() + .filter_map(|(k,v)| { + if (f)(k, v) { + Some(k.clone()) + } else { + None + } + }) + .collect::>(); + + for name in to_prune { + self.interfaces.remove(&name); + } + } + + pub fn set_filter( + &mut self, + filter: &str, + optimize: bool, + mask: Option + ) -> error::Result>> { + self.interfaces + .iter_mut() + .map(|(name, interface)| { + interface.set_filter(filter, optimize, mask) + .map(|bpf| (&**name, bpf)) + .map_err(|e| e.add_ifname(&name)) + }) + .collect::>() + } + + pub fn sendpacket(&self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> { + if let Some(interface) = self.interfaces.get(ifname) { + interface.sendpacket(packet).map_err(|e| e.add_ifname(ifname))?; + } + + Ok(()) + } +} + +impl AggregateInterface { + pub fn stream(self) -> error::Result> { + Ok(AggregateInterfaceStream { + streams: self.interfaces + .into_iter() + .map(|(ifname, interface)| { + let new_name = ifname.clone(); + interface + .stream() + .map(|stream| (ifname, stream)) + .map_err(|e| e.add_ifname(&new_name)) + }) + .collect::>()? + }) + } +} + +pub struct AggregateInterfaceStream { + streams: StreamMap> +} + +impl AggregateInterfaceStream { + pub fn sendpacket(&mut self, ifname: &str, packet: packets::EthernetPkt) -> error::Result<()> { + if let Some(interface) = self.streams + .values_mut() + .find(|interface| { + let dev_name = interface.inner + .get_ref() + .interface + .dev_name + .clone(); + + CString::new(ifname).map(|ifname| + ifname == dev_name) + .unwrap_or(false) + }) { + interface.sendpacket(packet)?; + } + + Ok(()) + } +} + +impl futures::Stream for AggregateInterfaceStream { + type Item = (String, error::Result); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.streams.poll_next_unpin(cx) + } +} \ No newline at end of file