hydra

Terminal replacement for Loopback — virtual audio devices and routing on macOS, from a ratatui TUI.
Log | Files | Refs | README | LICENSE

main.rs (5288B)


      1 //! `hydrad` — the Hydra daemon. It owns all CoreAudio state (taps, aggregate devices,
      2 //! IOProcs) and is the only process that touches the audio engine. Clients connect over
      3 //! a Unix-domain socket.
      4 
      5 mod server;
      6 
      7 use std::error::Error;
      8 use std::fs;
      9 use std::io::{BufReader, BufWriter};
     10 use std::os::unix::fs::PermissionsExt;
     11 use std::os::unix::net::{UnixListener, UnixStream};
     12 use std::sync::{Arc, Mutex};
     13 
     14 use hydra_core::config::Config;
     15 use hydra_core::engine::Engine;
     16 use hydra_core::ffi::process;
     17 use hydra_ipc::{read_msg, write_msg, Command, Response};
     18 
     19 fn main() -> Result<(), Box<dyn Error>> {
     20     let sock = hydra_ipc::socket_path();
     21     if let Some(dir) = sock.parent() {
     22         fs::create_dir_all(dir)?;
     23     }
     24 
     25     // Single-instance guard. The socket file existing isn't enough to tell a live daemon
     26     // from a stale socket left by a crash, so probe it: if something answers Ping, another
     27     // daemon owns CoreAudio — bail rather than yank the socket out from under it (which is
     28     // how a pile of competing daemons accumulated before). If nothing answers, it's stale.
     29     if let Some(version) = ping_existing(&sock) {
     30         eprintln!("hydrad already running (protocol v{version}) on {}; exiting.", sock.display());
     31         return Ok(());
     32     }
     33     let _ = fs::remove_file(&sock);
     34 
     35     let listener = UnixListener::bind(&sock)?;
     36     fs::set_permissions(&sock, fs::Permissions::from_mode(0o600))?;
     37     eprintln!("hydrad {} listening on {}", hydra_core::VERSION, sock.display());
     38 
     39     let engine = Arc::new(Mutex::new(Engine::new()));
     40 
     41     // Restore saved routes (KeepAlive LaunchAgent ⇒ this runs after every restart/reboot).
     42     // Resolve each saved app's bundle id to a currently-running PID; skip apps not running.
     43     let config = Config::load();
     44     if !config.routes.is_empty() {
     45         let restored = engine.lock().unwrap().restore(&config, resolve_bundle_to_pid);
     46         eprintln!("restored {restored}/{} saved route(s)", config.routes.len());
     47     }
     48 
     49     // Supervisor: the self-healing heart. Every few seconds it reconciles live routes against
     50     // the saved config — rebuilding routes whose aggregate died (coreaudiod restart, driver
     51     // reinstall) and resurrecting saved routes whose app came back. This is what makes Hydra
     52     // "set it and forget it": you create a route once, the daemon keeps it alive forever.
     53     {
     54         let engine = Arc::clone(&engine);
     55         std::thread::spawn(move || supervise(engine));
     56     }
     57 
     58     for conn in listener.incoming() {
     59         match conn {
     60             Ok(stream) => {
     61                 let engine = Arc::clone(&engine);
     62                 std::thread::spawn(move || {
     63                     if let Err(e) = server::handle(stream, engine) {
     64                         eprintln!("connection error: {e}");
     65                     }
     66                 });
     67             }
     68             Err(e) => eprintln!("accept error: {e}"),
     69         }
     70     }
     71 
     72     Ok(())
     73 }
     74 
     75 /// Resolve an app bundle id to a currently-running audio-process PID (None if not playing).
     76 pub(crate) fn resolve_bundle_to_pid(bundle_id: &str) -> Option<i32> {
     77     process::list_audio_processes()
     78         .into_iter()
     79         .find(|a| a.bundle_id.as_deref() == Some(bundle_id))
     80         .map(|a| a.pid)
     81 }
     82 
     83 /// The supervisor loop: reconcile live routes against the saved config on a timer, forever.
     84 /// Holds the engine lock only for the brief reconcile call, never across the sleep.
     85 fn supervise(engine: Arc<Mutex<Engine>>) {
     86     use std::time::Duration;
     87     // 4s: long enough that a healthy route logs several IOProc callbacks between ticks (so a
     88     // frozen counter unambiguously means "aggregate dead"), short enough to feel instant.
     89     const TICK: Duration = Duration::from_secs(4);
     90     loop {
     91         std::thread::sleep(TICK);
     92         let saved = Config::load().routes;
     93         let actions = {
     94             let mut eng = engine.lock().unwrap();
     95             eng.reconcile(&saved, resolve_bundle_to_pid)
     96         };
     97         if !actions.is_empty() {
     98             for a in &actions {
     99                 eprintln!("[supervisor] {a}");
    100             }
    101             // Persist the healed state + nudge the SketchyBar widget.
    102             let cfg = engine.lock().unwrap().to_config();
    103             cfg.save();
    104             let _ = std::process::Command::new("sketchybar")
    105                 .args(["--trigger", "hydra_route_change"])
    106                 .stdout(std::process::Stdio::null())
    107                 .stderr(std::process::Stdio::null())
    108                 .spawn();
    109         }
    110     }
    111 }
    112 
    113 /// Probe an existing socket: returns the protocol version if a live daemon answers `Ping`,
    114 /// or `None` if nothing's there / it's a stale socket. A short timeout keeps a wedged peer
    115 /// from hanging startup.
    116 fn ping_existing(sock: &std::path::Path) -> Option<u32> {
    117     use std::time::Duration;
    118     let stream = UnixStream::connect(sock).ok()?;
    119     stream.set_read_timeout(Some(Duration::from_millis(500))).ok()?;
    120     stream.set_write_timeout(Some(Duration::from_millis(500))).ok()?;
    121     let mut writer = BufWriter::new(stream.try_clone().ok()?);
    122     let mut reader = BufReader::new(stream);
    123     write_msg(&mut writer, &Command::Ping).ok()?;
    124     match read_msg::<_, Response>(&mut reader).ok()? {
    125         Some(Response::Pong { version }) => Some(version),
    126         _ => None,
    127     }
    128 }