main.rs (5288B)
1 //! `hydrad` — the Hydra daemon. It owns all CoreAudio state (taps, aggregate devices, 2 //! IOProcs) and is the only process that touches the audio engine. Clients connect over 3 //! a Unix-domain socket. 4 5 mod server; 6 7 use std::error::Error; 8 use std::fs; 9 use std::io::{BufReader, BufWriter}; 10 use std::os::unix::fs::PermissionsExt; 11 use std::os::unix::net::{UnixListener, UnixStream}; 12 use std::sync::{Arc, Mutex}; 13 14 use hydra_core::config::Config; 15 use hydra_core::engine::Engine; 16 use hydra_core::ffi::process; 17 use hydra_ipc::{read_msg, write_msg, Command, Response}; 18 19 fn main() -> Result<(), Box<dyn Error>> { 20 let sock = hydra_ipc::socket_path(); 21 if let Some(dir) = sock.parent() { 22 fs::create_dir_all(dir)?; 23 } 24 25 // Single-instance guard. The socket file existing isn't enough to tell a live daemon 26 // from a stale socket left by a crash, so probe it: if something answers Ping, another 27 // daemon owns CoreAudio — bail rather than yank the socket out from under it (which is 28 // how a pile of competing daemons accumulated before). If nothing answers, it's stale. 29 if let Some(version) = ping_existing(&sock) { 30 eprintln!("hydrad already running (protocol v{version}) on {}; exiting.", sock.display()); 31 return Ok(()); 32 } 33 let _ = fs::remove_file(&sock); 34 35 let listener = UnixListener::bind(&sock)?; 36 fs::set_permissions(&sock, fs::Permissions::from_mode(0o600))?; 37 eprintln!("hydrad {} listening on {}", hydra_core::VERSION, sock.display()); 38 39 let engine = Arc::new(Mutex::new(Engine::new())); 40 41 // Restore saved routes (KeepAlive LaunchAgent ⇒ this runs after every restart/reboot). 42 // Resolve each saved app's bundle id to a currently-running PID; skip apps not running. 43 let config = Config::load(); 44 if !config.routes.is_empty() { 45 let restored = engine.lock().unwrap().restore(&config, resolve_bundle_to_pid); 46 eprintln!("restored {restored}/{} saved route(s)", config.routes.len()); 47 } 48 49 // Supervisor: the self-healing heart. Every few seconds it reconciles live routes against 50 // the saved config — rebuilding routes whose aggregate died (coreaudiod restart, driver 51 // reinstall) and resurrecting saved routes whose app came back. This is what makes Hydra 52 // "set it and forget it": you create a route once, the daemon keeps it alive forever. 53 { 54 let engine = Arc::clone(&engine); 55 std::thread::spawn(move || supervise(engine)); 56 } 57 58 for conn in listener.incoming() { 59 match conn { 60 Ok(stream) => { 61 let engine = Arc::clone(&engine); 62 std::thread::spawn(move || { 63 if let Err(e) = server::handle(stream, engine) { 64 eprintln!("connection error: {e}"); 65 } 66 }); 67 } 68 Err(e) => eprintln!("accept error: {e}"), 69 } 70 } 71 72 Ok(()) 73 } 74 75 /// Resolve an app bundle id to a currently-running audio-process PID (None if not playing). 76 pub(crate) fn resolve_bundle_to_pid(bundle_id: &str) -> Option<i32> { 77 process::list_audio_processes() 78 .into_iter() 79 .find(|a| a.bundle_id.as_deref() == Some(bundle_id)) 80 .map(|a| a.pid) 81 } 82 83 /// The supervisor loop: reconcile live routes against the saved config on a timer, forever. 84 /// Holds the engine lock only for the brief reconcile call, never across the sleep. 85 fn supervise(engine: Arc<Mutex<Engine>>) { 86 use std::time::Duration; 87 // 4s: long enough that a healthy route logs several IOProc callbacks between ticks (so a 88 // frozen counter unambiguously means "aggregate dead"), short enough to feel instant. 89 const TICK: Duration = Duration::from_secs(4); 90 loop { 91 std::thread::sleep(TICK); 92 let saved = Config::load().routes; 93 let actions = { 94 let mut eng = engine.lock().unwrap(); 95 eng.reconcile(&saved, resolve_bundle_to_pid) 96 }; 97 if !actions.is_empty() { 98 for a in &actions { 99 eprintln!("[supervisor] {a}"); 100 } 101 // Persist the healed state + nudge the SketchyBar widget. 102 let cfg = engine.lock().unwrap().to_config(); 103 cfg.save(); 104 let _ = std::process::Command::new("sketchybar") 105 .args(["--trigger", "hydra_route_change"]) 106 .stdout(std::process::Stdio::null()) 107 .stderr(std::process::Stdio::null()) 108 .spawn(); 109 } 110 } 111 } 112 113 /// Probe an existing socket: returns the protocol version if a live daemon answers `Ping`, 114 /// or `None` if nothing's there / it's a stale socket. A short timeout keeps a wedged peer 115 /// from hanging startup. 116 fn ping_existing(sock: &std::path::Path) -> Option<u32> { 117 use std::time::Duration; 118 let stream = UnixStream::connect(sock).ok()?; 119 stream.set_read_timeout(Some(Duration::from_millis(500))).ok()?; 120 stream.set_write_timeout(Some(Duration::from_millis(500))).ok()?; 121 let mut writer = BufWriter::new(stream.try_clone().ok()?); 122 let mut reader = BufReader::new(stream); 123 write_msg(&mut writer, &Command::Ping).ok()?; 124 match read_msg::<_, Response>(&mut reader).ok()? { 125 Some(Response::Pong { version }) => Some(version), 126 _ => None, 127 } 128 }