chore: sync konduit-platform

This commit is contained in:
Eugen Kaparulin
2026-06-08 09:11:15 +03:00
parent ee7898cfac
commit d3e6d89b6b
12 changed files with 2070 additions and 0 deletions

View File

@@ -0,0 +1,383 @@
use anyhow::Result;
use chacha20poly1305::{
aead::{Aead, KeyInit},
ChaCha20Poly1305, Key, Nonce,
};
use rand::rngs::OsRng;
use x25519_dalek::{PublicKey, StaticSecret};
pub struct Cipher {
inner: ChaCha20Poly1305,
}
impl Cipher {
pub fn new(key_bytes: &[u8; 32]) -> Self {
let key = Key::from_slice(key_bytes);
Self {
inner: ChaCha20Poly1305::new(key),
}
}
pub fn encrypt(&self, nonce: &[u8; 12], plaintext: &[u8]) -> Result<Vec<u8>> {
let nonce = Nonce::from_slice(nonce);
self.inner
.encrypt(nonce, plaintext)
.map_err(|_e| anyhow::anyhow!("Encryption error"))
}
pub fn decrypt(&self, nonce: &[u8; 12], ciphertext: &[u8]) -> Result<Vec<u8>> {
let nonce = Nonce::from_slice(nonce);
self.inner
.decrypt(nonce, ciphertext)
.map_err(|_e| anyhow::anyhow!("Decryption error"))
}
}
// X25519 Key Exchange for Session Encryption
pub struct KeyExchange {
secret: StaticSecret,
}
impl KeyExchange {
pub fn new() -> Self {
Self {
secret: StaticSecret::random_from_rng(OsRng),
}
}
pub fn from_secret_bytes(bytes: [u8; 32]) -> Self {
Self {
secret: StaticSecret::from(bytes),
}
}
pub fn public_key_bytes(&self) -> [u8; 32] {
let pub_key = PublicKey::from(&self.secret);
*pub_key.as_bytes()
}
pub fn exchange(&self, peer_public: &[u8; 32]) -> [u8; 32] {
let peer_pub = PublicKey::from(*peer_public);
let shared = self.secret.diffie_hellman(&peer_pub);
*shared.as_bytes()
}
pub fn as_bytes(&self) -> [u8; 32] {
self.secret.to_bytes()
}
}
// Ed25519 Identity Keys for Micro-CA Signatures
use ed25519_dalek::{Signer, Verifier};
pub struct IdentityKey {
keypair: ed25519_dalek::SigningKey,
}
impl IdentityKey {
pub fn generate() -> Self {
use rand::RngCore;
let mut bytes = [0u8; 32];
OsRng.fill_bytes(&mut bytes);
Self {
keypair: ed25519_dalek::SigningKey::from_bytes(&bytes),
}
}
pub fn from_bytes(bytes: &[u8; 32]) -> Self {
Self {
keypair: ed25519_dalek::SigningKey::from_bytes(bytes),
}
}
pub fn public_key_bytes(&self) -> [u8; 32] {
self.keypair.verifying_key().to_bytes()
}
pub fn sign(&self, message: &[u8]) -> [u8; 64] {
self.keypair.sign(message).to_bytes()
}
pub fn as_bytes(&self) -> [u8; 32] {
self.keypair.to_bytes()
}
}
pub struct IdentityVerifier;
impl IdentityVerifier {
pub fn verify(pub_key: &[u8; 32], message: &[u8], signature: &[u8; 64]) -> Result<()> {
let key = ed25519_dalek::VerifyingKey::from_bytes(pub_key)
.map_err(|_| anyhow::anyhow!("Invalid Ed25519 public key"))?;
let sig = ed25519_dalek::Signature::from_bytes(signature);
key.verify(message, &sig)
.map_err(|_| anyhow::anyhow!("Signature verification failed"))
}
}
// ── Key Derivation (Brain Wallet) ────────────────────────────────────────────
//
// Algorithm: Argon2id (memory-hard, side-channel resistant)
// All improvements over the original PBKDF2 path:
// 1. Memory-hard KDF (GPU/ASIC resistance)
// 2. Salt is 32 cryptographically random bytes (not a string)
// 3. Passphrase is NFKC-normalised + trimmed before hashing
// 4. KDF parameters are stored alongside derived data (recoverability)
// 5. Format version field enables future algorithm agility
// 6. Returns Result<> — no panics in crypto code
use argon2::{Algorithm, Argon2, Params, Version};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use unicode_normalization::UnicodeNormalization;
/// Versioned KDF parameters that must be stored alongside any data protected
/// by a key derived from a mantra. They are everything needed to reproduce the
/// exact same key from the same mantra on any platform, now or in the future.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct KdfParams {
/// Format version (currently 2). Increment when changing algorithm or
/// parameter semantics so old configs can still be read.
pub version: u8,
/// KDF algorithm name — always "argon2id" for version 2.
pub algorithm: String,
/// Memory cost in KiB (262144 = 256 MiB).
pub m_cost: u32,
/// Number of iterations / time cost.
pub t_cost: u32,
/// Degree of parallelism.
pub p_cost: u32,
/// 32 cryptographically-random bytes, hex-encoded. Never derived from user
/// input. Must be stored; losing it makes the key irrecoverable.
pub salt_hex: String,
}
impl KdfParams {
/// Default secure parameters: RFC 9106 "interactive" profile.
/// ~256 MiB RAM, 3 passes, 4 threads — takes ≈13 s on typical hardware.
pub const DEFAULT_M_COST: u32 = 262_144; // 256 MiB
pub const DEFAULT_T_COST: u32 = 3;
pub const DEFAULT_P_COST: u32 = 4;
/// Create new v2 params with the given pre-generated random salt.
pub fn new_v2(salt: [u8; 32]) -> Self {
Self {
version: 2,
algorithm: "argon2id".to_string(),
m_cost: Self::DEFAULT_M_COST,
t_cost: Self::DEFAULT_T_COST,
p_cost: Self::DEFAULT_P_COST,
salt_hex: hex::encode(salt),
}
}
}
pub struct KeyGenerator;
impl KeyGenerator {
/// Generate a fresh [`KdfParams`] with a cryptographically-random 32-byte
/// salt. Call this once per bootstrap; then store the params in `server.toml`.
pub fn generate_kdf_params() -> KdfParams {
let mut salt = [0u8; 32];
OsRng.fill_bytes(&mut salt);
KdfParams::new_v2(salt)
}
/// Derive a 32-byte master key from a human passphrase (mantra).
///
/// ## What this does
/// 1. **NFKC-normalise** the passphrase and trim surrounding whitespace so
/// that different Unicode representations of the same text always hash
/// identically (critical for cross-platform recovery).
/// 2. Decode the random salt from `params.salt_hex`.
/// 3. Run **Argon2id** with the stored memory/time/parallelism parameters.
///
/// Returns `Err` on invalid params or hex — never panics.
pub fn derive_from_passphrase(passphrase: &str, params: &KdfParams) -> Result<[u8; 32]> {
// Step 1: NFKC normalise + trim (cross-platform / cross-keyboard safety)
let normalised: String = passphrase.nfkc().collect::<String>().trim().to_string();
if normalised.is_empty() {
anyhow::bail!("Passphrase must not be empty after normalisation");
}
// Step 2: Decode the stored random salt
let salt_bytes = hex::decode(&params.salt_hex)
.map_err(|e| anyhow::anyhow!("Invalid salt hex: {}", e))?;
// Step 3: Build Argon2id with stored parameters
let argon2_params = Params::new(params.m_cost, params.t_cost, params.p_cost, Some(32))
.map_err(|e| anyhow::anyhow!("Invalid Argon2 parameters: {}", e))?;
let argon2 = Argon2::new(Algorithm::Argon2id, Version::V0x13, argon2_params);
let mut key = [0u8; 32];
argon2
.hash_password_into(normalised.as_bytes(), &salt_bytes, &mut key)
.map_err(|e| anyhow::anyhow!("Argon2id derivation failed: {}", e))?;
Ok(key)
}
/// **Legacy v1 path — signature preserved for migration tooling.**
///
/// The pbkdf2/hmac/sha2 crates have been removed. To re-enable this for a
/// migration binary, add those deps back and replace this body.
#[allow(dead_code)]
pub fn derive_from_passphrase_v1_legacy(_passphrase: &str, _salt: &str) -> ! {
panic!(
"Legacy PBKDF2 path is not compiled in. \
Add pbkdf2/hmac/sha2 deps and restore the body to use this."
)
}
}
pub struct SessionKeys {
pub server_to_client: [u8; 32],
pub client_to_server: [u8; 32],
}
impl SessionKeys {
pub fn derive(shared_secret: &[u8; 32], client_pub: &[u8; 32], server_pub: &[u8; 32]) -> Self {
// Simple HKDF-like derivation using BLAKE3
let mut context = Vec::new();
context.extend_from_slice(client_pub);
context.extend_from_slice(server_pub);
let mut hasher = blake3::Hasher::new_derive_key("konduit-s2c");
hasher.update(shared_secret);
hasher.update(&context);
let mut s2c = [0u8; 32];
s2c.copy_from_slice(hasher.finalize().as_bytes());
let mut hasher = blake3::Hasher::new_derive_key("konduit-c2s");
hasher.update(shared_secret);
hasher.update(&context);
let mut c2s = [0u8; 32];
c2s.copy_from_slice(hasher.finalize().as_bytes());
SessionKeys {
server_to_client: s2c,
client_to_server: c2s,
}
}
}
pub struct NonceCounter {
counter: u64,
}
impl NonceCounter {
pub fn new() -> Self {
Self { counter: 0 }
}
pub fn next(&mut self) -> [u8; 12] {
let mut nonce = [0u8; 12];
// 4 bytes random prefix (salt) to prevent collision on restart?
// For now just 12 bytes counter le
nonce[0..8].copy_from_slice(&self.counter.to_le_bytes());
self.counter += 1;
nonce
}
}
/// Compute HMAC-like MAC using BLAKE3 for authentication
pub fn compute_mac(key: &[u8], message: &[u8]) -> Result<[u8; 32]> {
use blake3::Hasher;
let key_32: [u8; 32] = key
.try_into()
.map_err(|_| anyhow::anyhow!("Mac key must be exactly 32 bytes (64 hex characters)"))?;
let mut hasher = Hasher::new_keyed(&key_32);
hasher.update(message);
Ok(*hasher.finalize().as_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hmac_deterministic() {
let key = b"test-key-32-bytes-long-padding!!";
let message = b"hello world";
let hmac1 = compute_mac(key, message).unwrap();
let hmac2 = compute_mac(key, message).unwrap();
assert_eq!(hmac1, hmac2, "HMAC should be deterministic");
}
#[test]
fn test_hmac_different_keys() {
let key1 = b"key1-32-bytes-long-padding!!!!!!";
let key2 = b"key2-32-bytes-long-padding!!!!!!";
let message = b"hello world";
let hmac1 = compute_mac(key1, message).unwrap();
let hmac2 = compute_mac(key2, message).unwrap();
assert_ne!(
hmac1, hmac2,
"Different keys should produce different HMACs"
);
}
#[test]
fn test_kdf_deterministic() {
let mantra = "my secret server mantra";
let params = KdfParams::new_v2([42u8; 32]);
let key1 = KeyGenerator::derive_from_passphrase(mantra, &params).unwrap();
let key2 = KeyGenerator::derive_from_passphrase(mantra, &params).unwrap();
assert_eq!(
key1, key2,
"Derivation should be exactly deterministic for the same mantra and params"
);
}
#[test]
fn test_kdf_different_salts() {
let mantra = "my secret server mantra";
let params1 = KdfParams::new_v2([1u8; 32]);
let params2 = KdfParams::new_v2([2u8; 32]);
let key1 = KeyGenerator::derive_from_passphrase(mantra, &params1).unwrap();
let key2 = KeyGenerator::derive_from_passphrase(mantra, &params2).unwrap();
assert_ne!(key1, key2, "Different salts must produce different keys");
}
#[test]
fn test_kdf_different_mantras() {
let params = KdfParams::new_v2([42u8; 32]);
let key1 = KeyGenerator::derive_from_passphrase("mantra A", &params).unwrap();
let key2 = KeyGenerator::derive_from_passphrase("mantra B", &params).unwrap();
assert_ne!(key1, key2, "Different mantras must produce different keys");
}
#[test]
fn test_kdf_nfkc_normalization() {
// "é" can be represented as a single code point (U+00E9) or two (U+0065, U+0301)
let composed = "stréssed";
let decomposed = "stre\u{0301}ssed";
let params = KdfParams::new_v2([42u8; 32]);
let key1 = KeyGenerator::derive_from_passphrase(composed, &params).unwrap();
let key2 = KeyGenerator::derive_from_passphrase(decomposed, &params).unwrap();
assert_eq!(
key1, key2,
"NFKC normalization must ensure visually identical mantras yield the same key"
);
// Also test leading/trailing whitespace trimming
let trimmed = KeyGenerator::derive_from_passphrase(" stréssed \n ", &params).unwrap();
assert_eq!(key1, trimmed, "Whitespace should be trimmed securely");
}
}

View File

@@ -0,0 +1,226 @@
use super::DnsState;
use anyhow::{Context, Result};
use std::net::IpAddr;
use tracing::{info, warn};
// ---------------------------------------------------------------------------
// systemd-resolved D-Bus proxy
// ---------------------------------------------------------------------------
#[zbus::proxy(
interface = "org.freedesktop.resolve1.Manager",
default_service = "org.freedesktop.resolve1",
default_path = "/org/freedesktop/resolve1"
)]
trait Resolve1Manager {
// a(iay): array of (address_family, raw_bytes)
#[zbus(name = "SetLinkDNS")]
fn set_link_dns(
&self,
ifindex: i32,
addresses: Vec<(i32, Vec<u8>)>,
) -> zbus::Result<()>;
// a(sb): array of (domain, is_routing_domain)
fn set_link_domains(
&self,
ifindex: i32,
domains: Vec<(String, bool)>,
) -> zbus::Result<()>;
fn revert_link(&self, ifindex: i32) -> zbus::Result<()>;
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
pub async fn set_dns(
interface: &str,
if_index: Option<u32>,
dns_servers: &[IpAddr],
_assigned_ip: IpAddr,
state: &mut DnsState,
) -> Result<()> {
if dns_servers.is_empty() {
return Ok(());
}
// Prefer in-process D-Bus (no polkit when caller has CAP_NET_ADMIN).
// Fall back to spawning resolvectl for environments without systemd-resolved.
let result = match if_index {
Some(idx) => set_dns_dbus(idx, dns_servers).await,
None => {
warn!("Interface index unknown, falling back to resolvectl subprocess");
set_dns_resolvectl(interface, dns_servers).await
}
};
match result {
Ok(_) => {
state.configured = true;
}
Err(e) => {
warn!(
"D-Bus DNS config failed ({:#}), trying resolvectl subprocess",
e
);
match set_dns_resolvectl(interface, dns_servers).await {
Ok(_) => {
state.configured = true;
}
Err(e2) => {
warn!("Could not configure DNS: {}", e2);
warn!("VPN DNS servers: {:?}", dns_servers);
warn!(
"Configure manually: resolvectl dns {} {}",
interface,
dns_servers
.iter()
.map(|ip| ip.to_string())
.collect::<Vec<_>>()
.join(" ")
);
}
}
}
}
Ok(())
}
pub async fn restore(
interface: &str,
if_index: Option<u32>,
state: &mut DnsState,
) -> Result<()> {
if !state.configured {
return Ok(());
}
info!("Restoring DNS settings...");
let result = match if_index {
Some(idx) => revert_link_dbus(idx).await,
None => revert_resolvectl(interface).await,
};
if let Err(e) = result {
warn!("D-Bus DNS revert failed ({}), trying resolvectl", e);
revert_resolvectl(interface).await.ok();
}
state.configured = false;
Ok(())
}
// ---------------------------------------------------------------------------
// D-Bus implementation (no subprocess, no polkit with CAP_NET_ADMIN)
// ---------------------------------------------------------------------------
async fn set_dns_dbus(if_index: u32, dns_servers: &[IpAddr]) -> Result<()> {
let conn = zbus::Connection::system()
.await
.context("Failed to connect to D-Bus system bus")?;
let proxy = Resolve1ManagerProxy::new(&conn)
.await
.context("Failed to create resolve1 proxy")?;
// Encode addresses as (address_family: i32, raw_bytes: Vec<u8>)
let addrs: Vec<(i32, Vec<u8>)> = dns_servers
.iter()
.map(|ip| match ip {
IpAddr::V4(v4) => (2i32, v4.octets().to_vec()),
IpAddr::V6(v6) => (10i32, v6.octets().to_vec()),
})
.collect();
proxy
.set_link_dns(if_index as i32, addrs)
.await
.context("SetLinkDNS failed")?;
// Route all queries through the VPN link.
// D-Bus SetLinkDomains takes (domain, is_routing_only): pass root domain "."
// with is_routing_only=true, which systemd-resolved displays as "~." and marks
// the link as DefaultRoute:yes. Passing "~." with true double-flags it and
// results in a malformed domain that shows as DefaultRoute:no.
proxy
.set_link_domains(if_index as i32, vec![(".".to_string(), true)])
.await
.context("SetLinkDomains failed")?;
info!(
"DNS configured via D-Bus for index {}: {}",
if_index,
dns_servers
.iter()
.map(|ip| ip.to_string())
.collect::<Vec<_>>()
.join(", ")
);
Ok(())
}
async fn revert_link_dbus(if_index: u32) -> Result<()> {
let conn = zbus::Connection::system()
.await
.context("Failed to connect to D-Bus system bus")?;
let proxy = Resolve1ManagerProxy::new(&conn)
.await
.context("Failed to create resolve1 proxy")?;
proxy
.revert_link(if_index as i32)
.await
.context("RevertLink failed")?;
Ok(())
}
// ---------------------------------------------------------------------------
// resolvectl subprocess fallback
// ---------------------------------------------------------------------------
async fn set_dns_resolvectl(interface: &str, dns_servers: &[IpAddr]) -> Result<()> {
let dns_strs: Vec<String> = dns_servers.iter().map(|ip| ip.to_string()).collect();
let status = tokio::process::Command::new("resolvectl")
.arg("dns")
.arg(interface)
.args(&dns_strs)
.status()
.await
.context("resolvectl not found")?;
if !status.success() {
anyhow::bail!("resolvectl dns failed");
}
let status = tokio::process::Command::new("resolvectl")
.args(["domain", interface, "~."])
.status()
.await
.context("resolvectl domain failed")?;
if !status.success() {
anyhow::bail!("resolvectl domain failed");
}
info!(
"DNS configured via resolvectl for {}: {}",
interface,
dns_strs.join(", ")
);
Ok(())
}
async fn revert_resolvectl(interface: &str) -> Result<()> {
let _ = tokio::process::Command::new("resolvectl")
.args(["revert", interface])
.status()
.await;
Ok(())
}

View File

@@ -0,0 +1,23 @@
use super::DnsState;
use anyhow::Result;
use std::net::IpAddr;
pub async fn set_dns(interface: &str, dns_servers: &[IpAddr], state: &mut DnsState) -> Result<()> {
tracing::info!(
"Setting DNS on macOS for interface {}: {:?}",
interface,
dns_servers
);
// macOS implementation usually requires SystemConfiguration framework.
// We would use `SCPreferences` to edit the network service associated with the interface.
// Placeholder logic
state.configured = true;
Ok(())
}
pub async fn restore(_interface: &str, _state: &mut DnsState) -> Result<()> {
tracing::info!("Restoring DNS on macOS...");
Ok(())
}

View File

@@ -0,0 +1,102 @@
use anyhow::Result;
use std::net::IpAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
#[cfg(target_os = "windows")]
mod windows;
/// Cross-platform DNS configuration manager
pub struct DnsManager {
/// Interface name or identifier
interface: String,
/// Interface index (if available/relevant)
if_index: Option<u32>,
/// IP address assigned to the TUN interface
assigned_ip: IpAddr,
/// State to track if we modified DNS
state: Arc<Mutex<DnsState>>,
}
#[derive(Debug, Default)]
struct DnsState {
configured: bool,
_original_dns: Vec<IpAddr>,
}
impl DnsManager {
/// Create a new DNS manager for a specific interface
pub fn new(interface: String, if_index: Option<u32>, assigned_ip: IpAddr) -> Self {
Self {
interface,
if_index,
assigned_ip,
state: Arc::new(Mutex::new(DnsState::default())),
}
}
/// Set DNS servers for the interface
pub async fn set_dns(&self, dns_servers: &[IpAddr]) -> Result<()> {
if dns_servers.is_empty() {
return Ok(());
}
// Lock state
let mut state = self.state.lock().await;
// Platform specific implementation
#[cfg(target_os = "linux")]
{
linux::set_dns(&self.interface, self.if_index, dns_servers, self.assigned_ip, &mut state).await?;
}
#[cfg(target_os = "windows")]
{
windows::set_dns(&self.interface, self.if_index, dns_servers, &mut state).await?;
}
#[cfg(target_os = "macos")]
{
macos::set_dns(&self.interface, dns_servers, &mut state).await?;
}
#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
{
tracing::warn!("DNS configuration not supported on this platform");
}
Ok(())
}
/// Restore original DNS settings
pub async fn restore(&self) -> Result<()> {
let mut state = self.state.lock().await;
if !state.configured {
return Ok(());
}
tracing::info!("Restoring DNS settings...");
#[cfg(target_os = "linux")]
{
linux::restore(&self.interface, self.if_index, &mut state).await?;
}
#[cfg(target_os = "windows")]
{
windows::restore(&self.interface, self.if_index, &mut state).await?;
}
#[cfg(target_os = "macos")]
{
macos::restore(&self.interface, &mut state).await?;
}
state.configured = false;
Ok(())
}
}

View File

@@ -0,0 +1,99 @@
use super::DnsState;
use anyhow::Result;
use std::net::IpAddr;
pub async fn set_dns(
interface: &str,
if_index: Option<u32>,
dns_servers: &[IpAddr],
state: &mut DnsState,
) -> Result<()> {
// Windows implementation using windows-rs crate
// Note: This logic runs within unsafe blocks as it calls Win32 APIs
use std::ffi::c_void;
use windows::Win32::Foundation::NO_ERROR;
use windows::Win32::NetworkManagement::IpHelper::{
SetInterfaceDnsSettings, DNS_INTERFACE_SETTINGS, DNS_INTERFACE_SETTINGS_FLAGS,
DNS_INTERFACE_SETTINGS_VERSION1, DNS_SETTING_NAMESERVER,
};
// We need the Interface LUID (Locally Unique Identifier) or Index/GUID.
// The user provided `interface` string might be a GUID or friendly name.
// If if_index is provided, that's easier for some APIs.
// Convert IP addresses to wide string (UTF-16) comma-separated
let mut dns_str = String::new();
for (i, ip) in dns_servers.iter().enumerate() {
if i > 0 {
dns_str.push(',');
}
dns_str.push_str(&ip.to_string());
}
let dns_wide: Vec<u16> = dns_str.encode_utf16().chain(std::iter::once(0)).collect();
if let Some(idx) = if_index {
// Find existing settings to backup?
// Windows doesn't make it easy to "get current and restore later" without some work.
// For simplicity, we assume we can just clear settings on restore.
}
/*
Implementation note:
Since we cannot easily compile this on Linux to verify, we outline the logic.
Real implementation would need `GetInterfaceDnsSettings` to backup state.
*/
tracing::info!(
"Setting DNS on Windows for interface {}: {:?}",
interface,
dns_servers
);
/*
unsafe {
let mut settings = DNS_INTERFACE_SETTINGS {
Version: DNS_INTERFACE_SETTINGS_VERSION1,
Flags: DNS_SETTING_NAMESERVER,
NameServer: windows::core::PCWSTR(dns_wide.as_ptr()),
..Default::default()
};
let guid = windows::core::GUID::from(interface)?; // Assuming interface is a GUID string
let result = SetInterfaceDnsSettings(guid, &mut settings);
if result != NO_ERROR {
anyhow::bail!("Failed to set DNS: {:?}", result);
}
}
*/
// For now, since we can't test, we log a limitation
tracing::warn!("Windows DNS setting logic placeholder");
state.configured = true;
Ok(())
}
pub async fn restore(
_interface: &str,
_if_index: Option<u32>,
_state: &mut DnsState,
) -> Result<()> {
tracing::info!("Restoring DNS on Windows...");
// Clear custom DNS settings (return to DHCP?)
/*
unsafe {
let mut settings = DNS_INTERFACE_SETTINGS {
Version: DNS_INTERFACE_SETTINGS_VERSION1,
Flags: DNS_SETTING_NAMESERVER,
NameServer: windows::core::PCWSTR::null(),
..Default::default()
};
// Call SetInterfaceDnsSettings with empty NameServer or appropriate flags
}
*/
Ok(())
}

View File

@@ -0,0 +1,19 @@
//! `konduit-platform` — auditable building blocks of the Konduit VPN engine.
//!
//! Published under PolyForm Noncommercial 1.0.0.
//! The protocol framing, handshake, and stealth layers are not included here.
//!
//! # Modules
//! - [`crypto`] — X25519 key exchange, ChaCha20-Poly1305 cipher, Ed25519 identity keys,
//! Argon2id key derivation, BLAKE3-keyed MACs, session key derivation, nonce counters.
//! - [`stats`] — connection statistics with a 300-second ring-buffer history.
//! - [`tun`] — cross-platform TUN device (Linux, macOS, Android/iOS fd).
//! - [`dns`] — cross-platform DNS configuration (systemd-resolved D-Bus, resolvectl,
//! macOS NetworkSetup, Windows SetInterfaceDnsSettings).
//! - [`routes`] — VPN routing: Linux netlink, macOS `route`/`networksetup`.
pub mod crypto;
pub mod dns;
pub mod routes;
pub mod stats;
pub mod tun;

View File

@@ -0,0 +1,444 @@
use anyhow::{Context, Result};
use futures::stream::TryStreamExt;
use netlink_packet_route::route::RouteProtocol;
use rtnetlink::{new_connection, Handle, IpVersion};
use std::net::{IpAddr, Ipv4Addr};
use tracing::{info, warn};
const VPN_ROUTE_METRIC: u32 = 50;
/// Manages routing table for VPN connection using netlink.
/// Tracks every route it installs and removes exactly those on teardown.
/// Pre-existing routes are never touched.
pub struct RouteManager {
handle: Handle,
added_routes: Vec<(Ipv4Addr, u8)>,
tun_interface: String,
tun_index: Option<u32>,
dns_manager: crate::dns::DnsManager,
// Saved during setup for re-adding the server host route after resume.
server_ip: Option<Ipv4Addr>,
wifi_gateway: Option<Ipv4Addr>,
// Saved for re-adding the VPN default route after reconnect.
vpn_gateway: Option<Ipv4Addr>,
}
impl RouteManager {
pub async fn new(tun_interface: String) -> Result<Self> {
let (connection, handle, _) =
new_connection().context("Failed to create netlink connection")?;
tokio::spawn(connection);
Ok(Self {
handle,
added_routes: Vec::new(),
tun_interface: tun_interface.clone(),
tun_index: None,
dns_manager: crate::dns::DnsManager::new(tun_interface, None, IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
server_ip: None,
wifi_gateway: None,
vpn_gateway: None,
})
}
async fn get_interface_index(&self, name: &str) -> Result<u32> {
let mut links = self
.handle
.link()
.get()
.match_name(name.to_string())
.execute();
if let Some(link) = links.try_next().await? {
Ok(link.header.index)
} else {
anyhow::bail!("Interface {} not found", name)
}
}
async fn get_default_gateway(&self) -> Result<Ipv4Addr> {
let mut routes = self.handle.route().get(IpVersion::V4).execute();
while let Some(route) = routes.try_next().await? {
if route.header.destination_prefix_length == 0 {
for nla in route.attributes.iter() {
if let netlink_packet_route::route::RouteAttribute::Gateway(addr) = nla {
use netlink_packet_route::route::RouteAddress;
if let RouteAddress::Inet(ipv4_addr) = addr {
return Ok(*ipv4_addr);
}
}
}
}
}
anyhow::bail!("No default gateway found")
}
/// Install routes for the VPN connection.
///
/// Uses metric 50 (lower than typical DHCP default of 100) so VPN routes
/// win without deleting the original default route.
pub async fn setup_vpn_routes(
&mut self,
server_addr: &str,
vpn_gateway: IpAddr,
routes: &[String],
) -> Result<()> {
info!("Setting up VPN routes...");
self.tun_index = Some(
self.get_interface_index(&self.tun_interface)
.await
.context("Failed to get TUN interface index")?,
);
info!(
"TUN interface {} has index {}",
self.tun_interface,
self.tun_index.unwrap()
);
let current_gateway = self
.get_default_gateway()
.await
.context("Failed to get current gateway")?;
info!("Current gateway: {}", current_gateway);
// Resolve server address to IP
let server_ip = if let Ok(ip) = server_addr.parse::<IpAddr>() {
ip
} else {
let host = server_addr.split(':').next().unwrap_or(server_addr);
let addrs = tokio::net::lookup_host(format!("{}:0", host))
.await
.context("Failed to resolve server hostname")?;
addrs
.map(|addr| addr.ip())
.next()
.context("No IP address found for server")?
};
info!("Server IP: {}", server_ip);
// Host route for VPN server through current gateway so the tunnel itself
// doesn't loop back through itself.
if let IpAddr::V4(server_v4) = server_ip {
self.server_ip = Some(server_v4);
self.wifi_gateway = Some(current_gateway);
let result = self
.handle
.route()
.add()
.v4()
.destination_prefix(server_v4, 32)
.gateway(current_gateway)
.priority(VPN_ROUTE_METRIC)
.execute()
.await;
match result {
Ok(_) => self.added_routes.push((server_v4, 32)),
Err(e) => {
let err_msg = format!("{:?}", e);
if err_msg.contains("File exists")
|| err_msg.contains("EEXIST")
|| err_msg.contains("code: Some(-17)")
{
warn!("Server route already exists, continuing...");
} else {
return Err(anyhow::anyhow!("Failed to add server route: {:#}", e));
}
}
}
} else {
warn!("IPv6 server addresses not yet supported for routing");
}
let IpAddr::V4(gateway_v4) = vpn_gateway else {
return Err(anyhow::anyhow!("IPv6 VPN gateways not supported"));
};
self.vpn_gateway = Some(gateway_v4);
let is_full_tunnel = routes.iter().any(|r| r == "0.0.0.0/0");
if is_full_tunnel {
info!("Full-tunnel mode: adding default route via VPN...");
self.handle
.route()
.add()
.v4()
.destination_prefix(Ipv4Addr::new(0, 0, 0, 0), 0)
.gateway(gateway_v4)
.output_interface(self.tun_index.unwrap())
.priority(VPN_ROUTE_METRIC)
.protocol(RouteProtocol::Boot)
.execute()
.await
.map_err(|e| anyhow::anyhow!("Failed to add VPN default route: {:?}", e))?;
self.added_routes.push((Ipv4Addr::new(0, 0, 0, 0), 0));
} else {
info!(
"Split-tunnel mode: adding {} routes via VPN...",
routes.len()
);
for route_str in routes {
let Some((dest_str, prefix_str)) = route_str.split_once('/') else {
warn!("Invalid route format: {}", route_str);
continue;
};
let dest: Ipv4Addr = match dest_str.parse() {
Ok(ip) => ip,
Err(_) => {
warn!("Invalid route destination: {}", dest_str);
continue;
}
};
let prefix_len: u8 = match prefix_str.parse() {
Ok(p) => p,
Err(_) => {
warn!("Invalid prefix length: {}", prefix_str);
continue;
}
};
let result = self
.handle
.route()
.add()
.v4()
.destination_prefix(dest, prefix_len)
.gateway(gateway_v4)
.output_interface(self.tun_index.unwrap())
.priority(VPN_ROUTE_METRIC)
.execute()
.await;
match result {
Ok(_) => {
self.added_routes.push((dest, prefix_len));
info!("Added route: {}/{}", dest, prefix_len);
}
Err(e) => {
let err_msg = format!("{:?}", e);
if err_msg.contains("File exists")
|| err_msg.contains("EEXIST")
|| err_msg.contains("code: Some(-17)")
{
warn!("Route {} already exists, continuing...", route_str);
} else {
warn!("Failed to add route {}: {:?}", route_str, e);
}
}
}
}
}
info!("✅ VPN routes configured successfully");
Ok(())
}
/// Setup DNS configuration
pub async fn setup_dns(&mut self, dns_servers: &[IpAddr], assigned_ip: IpAddr) -> Result<()> {
if dns_servers.is_empty() {
return Ok(());
}
self.dns_manager =
crate::dns::DnsManager::new(self.tun_interface.clone(), self.tun_index, assigned_ip);
self.dns_manager.set_dns(dns_servers).await
}
/// Restore DNS configuration
pub async fn restore_dns(&self) -> Result<()> {
self.dns_manager.restore().await
}
/// Re-add the server host route if it has been removed by the kernel.
///
/// On Linux, suspending the machine brings wlan0 down, which causes the kernel
/// to purge all routes installed on that interface — including our host route for
/// the VPN server IP via the WiFi gateway. Without that route, reconnect SYNs
/// follow the VPN default route into tun0, get dropped, and time out. This
/// method must be called before each reconnect attempt.
pub async fn ensure_server_route(&mut self) -> Result<()> {
let (server_ip, gateway) = match (self.server_ip, self.wifi_gateway) {
(Some(s), Some(g)) => (s, g),
_ => return Ok(()),
};
let result = self
.handle
.route()
.add()
.v4()
.destination_prefix(server_ip, 32)
.gateway(gateway)
.priority(VPN_ROUTE_METRIC)
.execute()
.await;
match result {
Ok(_) => {
info!(
"Re-added server host route {}/32 via {} (was removed on suspend)",
server_ip, gateway
);
}
Err(e) => {
let msg = format!("{:?}", e);
if msg.contains("File exists") || msg.contains("EEXIST") || msg.contains("code: Some(-17)") {
// Route is already present — nothing to do.
} else {
return Err(anyhow::anyhow!("Failed to re-add server route: {:#}", e));
}
}
}
Ok(())
}
/// Remove the VPN default route (0.0.0.0/0) during a reconnect window so
/// traffic falls through to the WiFi default route (metric 600).
/// Call `resume_default_route()` after the new session is established.
pub async fn suspend_default_route(&mut self) -> Result<()> {
if !self.added_routes.contains(&(Ipv4Addr::new(0, 0, 0, 0), 0)) {
return Ok(());
}
let mut routes = self.handle.route().get(IpVersion::V4).execute();
let mut to_delete = None;
while let Some(route) = routes.try_next().await? {
if route.header.destination_prefix_length == 0 {
let is_ours = route.attributes.iter().any(|nla| {
matches!(
nla,
netlink_packet_route::route::RouteAttribute::Priority(p)
if *p == VPN_ROUTE_METRIC
)
});
if is_ours {
to_delete = Some(route);
break;
}
}
}
if let Some(route) = to_delete {
self.handle.route().del(route).execute().await.map_err(|e| {
anyhow::anyhow!("Failed to suspend VPN default route: {:?}", e)
})?;
info!("VPN default route suspended — traffic falls to WiFi during reconnect");
}
Ok(())
}
/// Re-add the VPN default route after a successful reconnect.
pub async fn resume_default_route(&mut self) -> Result<()> {
let gateway = match self.vpn_gateway {
Some(gw) => gw,
None => return Ok(()),
};
if !self.added_routes.contains(&(Ipv4Addr::new(0, 0, 0, 0), 0)) {
return Ok(());
}
let result = self
.handle
.route()
.add()
.v4()
.destination_prefix(Ipv4Addr::new(0, 0, 0, 0), 0)
.gateway(gateway)
.output_interface(self.tun_index.unwrap_or(0))
.priority(VPN_ROUTE_METRIC)
.protocol(RouteProtocol::Boot)
.execute()
.await;
match result {
Ok(_) => info!("VPN default route restored after reconnect"),
Err(e) => {
let msg = format!("{:?}", e);
if msg.contains("File exists") || msg.contains("EEXIST") || msg.contains("code: Some(-17)") {
// already present
} else {
return Err(anyhow::anyhow!("Failed to resume VPN default route: {:#}", e));
}
}
}
Ok(())
}
/// Remove every route that was added by `setup_vpn_routes`.
/// Pre-existing routes are untouched.
pub async fn restore_routes(&mut self) -> Result<()> {
info!(
"Removing {} tracked VPN routes...",
self.added_routes.len()
);
if self.added_routes.is_empty() {
return Ok(());
}
let mut routes_to_delete = Vec::new();
let mut routes = self.handle.route().get(IpVersion::V4).execute();
while let Some(route) = routes.try_next().await? {
let prefix_len = route.header.destination_prefix_length;
let dest_ip: Option<Ipv4Addr> = if prefix_len == 0 {
Some(Ipv4Addr::new(0, 0, 0, 0))
} else {
route.attributes.iter().find_map(|nla| {
if let netlink_packet_route::route::RouteAttribute::Destination(addr) = nla {
use netlink_packet_route::route::RouteAddress;
if let RouteAddress::Inet(ipv4) = addr {
return Some(*ipv4);
}
}
None
})
};
if let Some(ip) = dest_ip {
if self.added_routes.contains(&(ip, prefix_len)) {
let is_ours = route.attributes.iter().any(|nla| {
matches!(
nla,
netlink_packet_route::route::RouteAttribute::Priority(p)
if *p == VPN_ROUTE_METRIC
)
});
if is_ours {
routes_to_delete.push(route);
}
}
}
}
for route in routes_to_delete {
if let Err(e) = self.handle.route().del(route).execute().await {
warn!("Failed to delete tracked route: {:?}", e);
}
}
self.added_routes.clear();
info!("✅ VPN routes removed");
Ok(())
}
}
impl Drop for RouteManager {
fn drop(&mut self) {
if !self.added_routes.is_empty() {
warn!(
"RouteManager dropped with {} unrestored routes — call restore_routes() before dropping",
self.added_routes.len()
);
}
}
}

View File

@@ -0,0 +1,198 @@
use anyhow::Result;
use std::net::IpAddr;
use tracing::{info, warn};
pub struct MacOsNetManager {
pub tun_name: String,
server_host: String,
original_gateway: String,
primary_service: String,
}
impl MacOsNetManager {
/// Discover the current default gateway and primary network service, then return a manager.
/// Uses blocking `std::process::Command` — call from a sync context or `spawn_blocking`.
pub fn new(tun_name: String, server_endpoint: &str) -> Result<Self> {
let server_host = server_endpoint
.split(':')
.next()
.unwrap_or(server_endpoint)
.to_string();
let route_out = std::process::Command::new("route")
.args(["-n", "get", "default"])
.output()?;
let route_text = String::from_utf8_lossy(&route_out.stdout).into_owned();
let (original_gateway, iface) = Self::parse_default_route(&route_text)?;
let svc_out = std::process::Command::new("networksetup")
.args(["-listnetworkserviceorder"])
.output()?;
let svc_text = String::from_utf8_lossy(&svc_out.stdout).into_owned();
let primary_service = Self::parse_primary_service(&svc_text, &iface)
.unwrap_or_else(|_| "Wi-Fi".to_string());
info!(
"MacOsNetManager: gateway={}, service={}, tun={}",
original_gateway, primary_service, tun_name
);
Ok(Self {
tun_name,
server_host,
original_gateway,
primary_service,
})
}
// Always adds split-default (0/1 + 128/1) for full-tunnel mode.
// Split-tunnel (per-route from ServerConfig) is not yet supported on macOS.
pub async fn setup_vpn_routes(&self) -> Result<()> {
run("route", &["add", "-host", &self.server_host, &self.original_gateway]).await;
run("route", &["add", "-net", "0.0.0.0/1", "-interface", &self.tun_name]).await;
run("route", &["add", "-net", "128.0.0.0/1", "-interface", &self.tun_name]).await;
info!("macOS VPN routes added via {}", self.tun_name);
Ok(())
}
/// Configure DNS for the primary network service.
pub async fn setup_dns(&self, dns_ips: &[IpAddr], _assigned_ip: IpAddr) -> Result<()> {
if dns_ips.is_empty() {
return Ok(());
}
let ip_strings: Vec<String> = dns_ips.iter().map(|ip| ip.to_string()).collect();
let mut args: Vec<&str> = vec!["-setdnsservers", self.primary_service.as_str()];
args.extend(ip_strings.iter().map(|s| s.as_str()));
run("networksetup", &args).await;
info!("macOS DNS configured: {:?}", dns_ips);
Ok(())
}
/// Remove split-default routes so traffic falls through to WiFi during reconnect.
pub async fn suspend_default_route(&self) -> Result<()> {
run("route", &["delete", "-net", "0.0.0.0/1"]).await;
run("route", &["delete", "-net", "128.0.0.0/1"]).await;
info!("macOS default route suspended");
Ok(())
}
/// Re-add split-default routes after successful reconnect.
pub async fn resume_default_route(&self) -> Result<()> {
run("route", &["add", "-net", "0.0.0.0/1", "-interface", &self.tun_name]).await;
run("route", &["add", "-net", "128.0.0.0/1", "-interface", &self.tun_name]).await;
info!("macOS default route resumed via {}", self.tun_name);
Ok(())
}
// NOTE: original_gateway is captured once at construction time.
// If the WiFi gateway changes (network roaming), this route may go via
// the wrong gateway until the next reconnect that creates a new MacOsNetManager.
pub async fn ensure_server_route(&self) -> Result<()> {
run("route", &["delete", "-host", &self.server_host]).await;
run("route", &["add", "-host", &self.server_host, &self.original_gateway]).await;
Ok(())
}
/// Remove all VPN routes.
pub async fn restore_routes(&self) -> Result<()> {
run("route", &["delete", "-host", &self.server_host]).await;
run("route", &["delete", "-net", "0.0.0.0/1"]).await;
run("route", &["delete", "-net", "128.0.0.0/1"]).await;
info!("macOS VPN routes removed");
Ok(())
}
/// Revert DNS to automatic (empty = use DHCP-provided DNS).
pub async fn restore_dns(&self) -> Result<()> {
run("networksetup", &["-setdnsservers", &self.primary_service, "Empty"]).await;
info!("macOS DNS restored to automatic");
Ok(())
}
fn parse_default_route(output: &str) -> Result<(String, String)> {
let gateway = output
.lines()
.find(|l| l.trim_start().starts_with("gateway:"))
.and_then(|l| l.splitn(2, ':').nth(1))
.map(|s| s.trim().to_string())
.ok_or_else(|| anyhow::anyhow!("Could not find default gateway in route output"))?;
let iface = output
.lines()
.find(|l| l.trim_start().starts_with("interface:"))
.and_then(|l| l.splitn(2, ':').nth(1))
.map(|s| s.trim().to_string())
.ok_or_else(|| anyhow::anyhow!("Could not find default interface in route output"))?;
Ok((gateway, iface))
}
fn parse_primary_service(output: &str, iface: &str) -> Result<String> {
let lines: Vec<&str> = output.lines().collect();
for i in 0..lines.len().saturating_sub(1) {
if lines[i + 1].contains(&format!("Device: {}", iface)) {
let name = lines[i].trim();
let name = name.splitn(2, ") ").nth(1).unwrap_or(name.trim());
return Ok(name.to_string());
}
}
anyhow::bail!("Could not find network service for interface {}", iface)
}
}
async fn run(cmd: &str, args: &[&str]) {
match tokio::process::Command::new(cmd).args(args).output().await {
Ok(out) if out.status.success() => {}
Ok(out) => warn!(
"{} {} failed: {}",
cmd,
args.join(" "),
String::from_utf8_lossy(&out.stderr).trim()
),
Err(e) => warn!("Failed to run {}: {}", cmd, e),
}
}
#[cfg(test)]
mod tests {
use super::*;
const ROUTE_OUTPUT: &str = " route to: default
destination: default
mask: default
gateway: 192.168.1.1
interface: en0
flags: <UP,GATEWAY,DONE,STATIC,PRCLONING,GLOBAL>
recvpipe sendpipe ssthresh rtt,msec rtt,var hopcount mtu expire
0 0 0 35 128 0 1500 0 ";
const SERVICES_OUTPUT: &str = "An asterisk (*) denotes that a network service is disabled.
(1) Wi-Fi
(Hardware Port: Wi-Fi, Device: en0)
(2) Bluetooth PAN
(Hardware Port: Bluetooth PAN, Device: en1)
(3) Thunderbolt Bridge
(Hardware Port: Thunderbolt Bridge, Device: bridge0)
";
#[test]
fn parses_gateway_and_interface() {
let (gw, iface) = MacOsNetManager::parse_default_route(ROUTE_OUTPUT).unwrap();
assert_eq!(gw, "192.168.1.1");
assert_eq!(iface, "en0");
}
#[test]
fn parses_primary_service() {
let svc = MacOsNetManager::parse_primary_service(SERVICES_OUTPUT, "en0").unwrap();
assert_eq!(svc, "Wi-Fi");
}
#[test]
fn parse_service_unknown_iface_returns_err() {
let result = MacOsNetManager::parse_primary_service(SERVICES_OUTPUT, "en99");
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,9 @@
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
pub use linux::RouteManager;
#[cfg(target_os = "macos")]
mod macos;
#[cfg(target_os = "macos")]
pub use macos::MacOsNetManager;

View File

@@ -0,0 +1,353 @@
// Connection statistics tracking for konduit
// Provides real-time metrics and historical data for GUI display
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
/// Ring buffer size for historical data (300 seconds = 5 minutes at 1 sample/sec)
const HISTORY_SIZE: usize = 300;
/// Connection statistics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionStats {
/// Connection status: "connected", "disconnected", "connecting"
pub status: String,
/// Duration in seconds since connection started
pub duration: u64,
/// Total bytes uploaded
pub upload_bytes: u64,
/// Total bytes downloaded
pub download_bytes: u64,
/// Current upload speed (bytes/sec)
pub upload_speed: u64,
/// Current download speed (bytes/sec)
pub download_speed: u64,
/// Server address
pub server: String,
/// Peer ID
pub peer_id: String,
/// Local TUN interface IP (e.g., "10.0.0.2")
pub local_ip: Option<String>,
/// Upload speed history (last 300 seconds)
#[serde(skip_serializing_if = "Option::is_none")]
pub upload_history: Option<Vec<u64>>,
/// Download speed history (last 300 seconds)
#[serde(skip_serializing_if = "Option::is_none")]
pub download_history: Option<Vec<u64>>,
}
/// Statistics tracker for a VPN connection
pub struct StatsTracker {
/// Connection start time
start_time: Instant,
/// Total bytes uploaded (atomic for thread-safe updates)
upload_bytes: Arc<AtomicU64>,
/// Total bytes downloaded (atomic for thread-safe updates)
download_bytes: Arc<AtomicU64>,
/// Previous upload bytes (for speed calculation)
prev_upload: AtomicU64,
/// Previous download bytes (for speed calculation)
prev_download: AtomicU64,
/// Upload speed history ring buffer
upload_history: parking_lot::Mutex<RingBuffer>,
/// Download speed history ring buffer
download_history: parking_lot::Mutex<RingBuffer>,
/// Server address
server: String,
/// Peer ID
peer_id: String,
/// Local TUN IP
local_ip: parking_lot::Mutex<Option<String>>,
}
/// Ring buffer for historical speed data
struct RingBuffer {
data: Vec<u64>,
index: usize,
filled: bool,
}
impl RingBuffer {
fn new() -> Self {
Self {
data: vec![0; HISTORY_SIZE],
index: 0,
filled: false,
}
}
fn push(&mut self, value: u64) {
self.data[self.index] = value;
self.index = (self.index + 1) % HISTORY_SIZE;
if self.index == 0 {
self.filled = true;
}
}
fn to_vec(&self) -> Vec<u64> {
if !self.filled {
// Not filled yet, return only valid data (newest first)
self.data[0..self.index].iter().rev().copied().collect()
} else {
// Buffer is full, return in correct order (newest first)
let mut result = Vec::with_capacity(HISTORY_SIZE);
// Start from current position - 1 (newest) and go backwards
for i in 0..HISTORY_SIZE {
let idx = (self.index + HISTORY_SIZE - 1 - i) % HISTORY_SIZE;
result.push(self.data[idx]);
}
result
}
}
}
impl StatsTracker {
/// Create a new stats tracker
pub fn new(server: String, peer_id: String) -> Self {
Self {
start_time: Instant::now(),
upload_bytes: Arc::new(AtomicU64::new(0)),
download_bytes: Arc::new(AtomicU64::new(0)),
prev_upload: AtomicU64::new(0),
prev_download: AtomicU64::new(0),
upload_history: parking_lot::Mutex::new(RingBuffer::new()),
download_history: parking_lot::Mutex::new(RingBuffer::new()),
server,
peer_id,
local_ip: parking_lot::Mutex::new(None),
}
}
/// Get atomic upload counter (for incrementing from packet handlers)
pub fn upload_counter(&self) -> Arc<AtomicU64> {
Arc::clone(&self.upload_bytes)
}
/// Get atomic download counter (for incrementing from packet handlers)
pub fn download_counter(&self) -> Arc<AtomicU64> {
Arc::clone(&self.download_bytes)
}
/// Record bytes uploaded (convenience method)
pub fn record_upload(&self, bytes: usize) {
self.upload_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
}
/// Record bytes downloaded (convenience method)
pub fn record_download(&self, bytes: usize) {
self.download_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
/// Set local TUN interface IP
pub fn set_local_ip(&self, ip: String) {
*self.local_ip.lock() = Some(ip);
}
/// Update speed calculations and history (call this every second)
pub fn update(&self) {
let current_upload = self.upload_bytes.load(Ordering::Relaxed);
let current_download = self.download_bytes.load(Ordering::Relaxed);
let prev_upload = self.prev_upload.swap(current_upload, Ordering::Relaxed);
let prev_download = self.prev_download.swap(current_download, Ordering::Relaxed);
let upload_speed = current_upload.saturating_sub(prev_upload);
let download_speed = current_download.saturating_sub(prev_download);
self.upload_history.lock().push(upload_speed);
self.download_history.lock().push(download_speed);
}
/// Get current statistics snapshot
pub fn get_stats(&self, include_history: bool) -> ConnectionStats {
let current_upload = self.upload_bytes.load(Ordering::Relaxed);
let current_download = self.download_bytes.load(Ordering::Relaxed);
let prev_upload = self.prev_upload.load(Ordering::Relaxed);
let prev_download = self.prev_download.load(Ordering::Relaxed);
ConnectionStats {
status: "connected".to_string(),
duration: self.start_time.elapsed().as_secs(),
upload_bytes: current_upload,
download_bytes: current_download,
upload_speed: current_upload.saturating_sub(prev_upload),
download_speed: current_download.saturating_sub(prev_download),
server: self.server.clone(),
peer_id: self.peer_id.clone(),
local_ip: self.local_ip.lock().clone(),
upload_history: if include_history {
Some(self.upload_history.lock().to_vec())
} else {
None
},
download_history: if include_history {
Some(self.download_history.lock().to_vec())
} else {
None
},
}
}
/// Get stats as JSON string
pub fn get_stats_json(&self, include_history: bool) -> Result<String, serde_json::Error> {
serde_json::to_string(&self.get_stats(include_history))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_stats_tracker_creation() {
let tracker = StatsTracker::new("utilbox.eu:8443".to_string(), "test".to_string());
let stats = tracker.get_stats(false);
assert_eq!(stats.status, "connected");
assert_eq!(stats.upload_bytes, 0);
assert_eq!(stats.download_bytes, 0);
assert_eq!(stats.server, "utilbox.eu:8443");
assert_eq!(stats.peer_id, "test");
}
#[test]
fn test_record_traffic() {
let tracker = StatsTracker::new("utilbox.eu:8443".to_string(), "test".to_string());
tracker.record_upload(1024);
tracker.record_download(2048);
let stats = tracker.get_stats(false);
assert_eq!(stats.upload_bytes, 1024);
assert_eq!(stats.download_bytes, 2048);
}
#[test]
fn test_speed_calculation() {
let tracker = StatsTracker::new("utilbox.eu:8443".to_string(), "test".to_string());
// Record some traffic
tracker.record_upload(1024);
tracker.record_download(2048);
// Update to calculate speed
tracker.update();
let stats = tracker.get_stats(false);
assert_eq!(stats.upload_speed, 1024);
assert_eq!(stats.download_speed, 2048);
// Record more traffic
tracker.record_upload(512);
tracker.record_download(1024);
// Update again
tracker.update();
let stats = tracker.get_stats(false);
assert_eq!(stats.upload_bytes, 1536);
assert_eq!(stats.download_bytes, 3072);
assert_eq!(stats.upload_speed, 512);
assert_eq!(stats.download_speed, 1024);
}
#[test]
fn test_ring_buffer() {
let mut buffer = RingBuffer::new();
// Add some values
for i in 0..10 {
buffer.push(i);
}
let vec = buffer.to_vec();
assert_eq!(vec.len(), 10);
assert_eq!(vec[0], 9); // Newest first
assert_eq!(vec[9], 0); // Oldest last
}
#[test]
fn test_ring_buffer_wraparound() {
let mut buffer = RingBuffer::new();
// Fill buffer and wrap around
for i in 0..350 {
buffer.push(i);
}
let vec = buffer.to_vec();
assert_eq!(vec.len(), HISTORY_SIZE);
assert_eq!(vec[0], 349); // Newest
assert_eq!(vec[HISTORY_SIZE - 1], 50); // Oldest (350 - 300)
}
#[test]
fn test_json_serialization() {
let tracker = StatsTracker::new("utilbox.eu:8443".to_string(), "test".to_string());
tracker.record_upload(1024);
tracker.record_download(2048);
tracker.set_local_ip("10.0.0.2".to_string());
let json = tracker.get_stats_json(false).unwrap();
assert!(json.contains("\"status\":\"connected\""));
assert!(json.contains("\"upload_bytes\":1024"));
assert!(json.contains("\"download_bytes\":2048"));
assert!(json.contains("\"local_ip\":\"10.0.0.2\""));
}
#[test]
fn test_atomic_counters() {
let tracker = StatsTracker::new("utilbox.eu:8443".to_string(), "test".to_string());
let upload_counter = tracker.upload_counter();
let download_counter = tracker.download_counter();
// Simulate concurrent updates
let handles: Vec<_> = (0..10)
.map(|_| {
let up = Arc::clone(&upload_counter);
let down = Arc::clone(&download_counter);
thread::spawn(move || {
for _ in 0..100 {
up.fetch_add(1, Ordering::Relaxed);
down.fetch_add(2, Ordering::Relaxed);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let stats = tracker.get_stats(false);
assert_eq!(stats.upload_bytes, 1000); // 10 threads * 100 increments
assert_eq!(stats.download_bytes, 2000); // 10 threads * 100 * 2
}
}

165
konduit-platform/src/tun.rs Normal file
View File

@@ -0,0 +1,165 @@
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use std::net::IpAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::{debug, error, info};
/// Interface to the TUN device
pub struct TunDevice {
pub name: String,
device: tun::AsyncDevice,
mtu: usize,
}
impl TunDevice {
/// Create a new TUN device (Linux only)
#[cfg(target_os = "linux")]
pub fn new(assigned_ip: IpAddr, mtu: usize) -> Result<Self> {
// Try to create TUN interface with incremental numbers
for i in 0..100 {
let tun_name = format!("tun{}", i);
let mut tun_config = tun::Configuration::default();
tun_config
.name(&tun_name)
.address(assigned_ip)
.netmask((255, 255, 255, 0))
.mtu(mtu as i32)
.up();
tun_config.platform(|config| {
config.packet_information(false);
});
match tun::create_as_async(&tun_config) {
Ok(device) => {
info!("Created TUN interface: {}", tun_name);
return Ok(Self {
name: tun_name,
device,
mtu,
});
}
Err(e) => {
debug!("Failed to create {}: {}, trying next...", tun_name, e);
continue;
}
}
}
anyhow::bail!(
"Failed to create TUN device: all interface numbers 0-99 are in use or unavailable"
)
}
/// Create a new TUN device (macOS only — creates a utun interface)
#[cfg(target_os = "macos")]
pub fn new(assigned_ip: IpAddr, mtu: usize) -> Result<Self> {
let mut tun_config = tun::Configuration::default();
tun_config
.address(assigned_ip)
.destination(assigned_ip)
.netmask((255, 255, 255, 255))
.mtu(mtu as i32)
.up();
tun_config.platform(|config| {
config.packet_information(false);
});
match tun::create_as_async(&tun_config) {
Ok(device) => {
let name = device.name().to_string();
info!("Created TUN interface: {}", name);
Ok(Self { name, device, mtu })
}
Err(e) => anyhow::bail!("Failed to create TUN device on macOS: {}", e),
}
}
/// Create a new TUN device from file descriptor (Android / iOS)
#[cfg(any(target_os = "android", target_os = "ios"))]
pub fn from_fd(fd: i32, mtu: usize) -> Result<Self> {
unsafe {
// Ensure the FD is in non-blocking mode for AsyncDevice
let flags = libc::fcntl(fd, libc::F_GETFL);
if flags < 0 {
anyhow::bail!("Failed to get FD flags");
}
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
anyhow::bail!("Failed to set FD to non-blocking");
}
let mut tun_config = tun::Configuration::default();
tun_config.raw_fd(fd);
match tun::create_as_async(&tun_config) {
Ok(device) => {
info!("Created TUN device from FD: {}", fd);
Ok(Self {
name: "tun0".to_string(), // Name doesn't matter on Android / iOS
device,
mtu,
})
}
Err(e) => {
anyhow::bail!("Failed to create TUN device from FD: {}", e)
}
}
}
}
/// Run the TUN device loop
pub async fn run(
self,
mut from_tcp_rx: mpsc::Receiver<Bytes>, // Packets received from TCP to be written to TUN
to_tcp_tx: mpsc::Sender<Bytes>, // Packets read from TUN to be sent to TCP
) -> Result<()> {
info!("TUN device loop started");
let (mut reader, mut writer) = tokio::io::split(self.device);
let mtu = self.mtu;
loop {
// Buffer for reading from TUN
// We authorize up to MTU + overhead, but each read returns one packet.
let mut buf = BytesMut::with_capacity(mtu + 100);
tokio::select! {
// 1. Packet from TUN (needs to be sent to server)
res = reader.read_buf(&mut buf) => {
match res {
Ok(n) => {
if n == 0 {
info!("TUN device closed");
break;
}
// Important: TUN read returns distinct packets.
// We must send exactly this packet.
let packet = buf.freeze(); // Consumes buf into Bytes
if let Err(_) = to_tcp_tx.send(packet).await {
break; // Channel closed
}
}
Err(e) => {
error!("TUN read error: {}", e);
break;
}
}
}
// 2. Packet from TCP (needs to be written to TUN)
Some(packet) = from_tcp_rx.recv() => {
if let Err(e) = writer.write_all(&packet).await {
error!("TUN write error: {}", e);
}
}
}
}
Ok(())
}
}