hydra

Terminal replacement for Loopback — virtual audio devices and routing on macOS, from a ratatui TUI.
Log | Files | Refs | README | LICENSE

commit 3b772e3d1d8a9921257acd0d64170fa236196394
parent 0e5db1afa327fa25d057474b3232294f392525e5
Author: Matthew Gantenbein <ganten1998@gmail.com>
Date:   Mon,  1 Jun 2026 15:19:38 -0500

P6: self-healing supervisor — routes survive coreaudiod/driver death

The "set it and forget it" finish. Every failure that made routing fragile —
coreaudiod restart, driver reinstall, device vanish — silently killed routes
and forced a manual rebuild. Now the daemon heals itself.

- engine::reconcile(): each supervisor tick, (1) rebuild routes whose IOProc
  callback counter froze (aggregate died — the counter is clocked independently
  of whether the app is making sound, so frozen = dead, not silent), re-resolving
  the app's PID; (2) resurrect saved routes whose app is now running but isn't live.
- route_is_dead(prev,now): the stall judgment extracted as a pure fn, unit-tested
  (fresh route never dead / advancing=alive / frozen=dead) — 3 tests.
- daemon: supervisor thread ticks every 4s (long enough that a healthy route logs
  several callbacks between ticks so a freeze is unambiguous), persists healed
  state, nudges SketchyBar. Deduped resolve_bundle_to_pid into main (pub(crate)).
- LaunchAgent already RunAtLoad + KeepAlive ⇒ daemon auto-starts on login,
  relaunches on crash. Full stack: crash→KeepAlive, reboot→restore, aggregate
  death→supervisor.

VERIFIED: stall logic unit-tested; supervisor took ZERO actions over 14s (3+
ticks) on a healthy route (no thrashing) — the critical forget-it property.
NOT yet verified live: the actual coreaudiod-kill rebuild (needs sudo, untestable
in this sandbox) — will get its real test on the user's next driver reinstall.

24 tests green, 0 warnings.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Diffstat:
Mcrates/hydra-core/src/engine.rs | 138++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/hydrad/src/main.rs | 54++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/hydrad/src/server.rs | 9+--------
3 files changed, 186 insertions(+), 15 deletions(-)

diff --git a/crates/hydra-core/src/engine.rs b/crates/hydra-core/src/engine.rs @@ -10,6 +10,16 @@ use crate::config::Config; use crate::config::SavedRoute; use crate::ffi::shim::MonitorRoute; +/// Decide whether a route is dead from its IOProc callback counts across two supervisor +/// ticks. `prev == u64::MAX` means "not yet sampled" (just created) — never dead. Otherwise +/// the aggregate is dead iff the counter didn't advance: the IOProc is clocked by the +/// aggregate independently of whether the tapped app is making sound, so a frozen counter +/// means the aggregate stopped (coreaudiod restart / driver reinstall / device gone), NOT +/// that the app went quiet. Pure so it can be unit-tested without CoreAudio. +fn route_is_dead(prev: u64, now: u64) -> bool { + prev != u64::MAX && now == prev +} + struct Entry { route: MonitorRoute, /// Human-readable "app → output" label for display. @@ -21,6 +31,10 @@ struct Entry { source_count: usize, /// Target output device UID, or `None` for the system default. output_uid: Option<String>, + /// IOProc callback count at the previous health tick; `u64::MAX` = not yet sampled. + /// A route whose count stops advancing has lost its aggregate (coreaudiod restart / + /// driver reinstall) and is rebuilt by the supervisor. + last_callbacks: u64, } #[derive(Default)] @@ -54,6 +68,7 @@ impl Engine { bundle_ids: bundle_id.into_iter().collect(), source_count: 1, output_uid: output_uid.map(str::to_string), + last_callbacks: u64::MAX, }, ); Ok(id) @@ -73,7 +88,14 @@ impl Engine { let id = route.id().to_string(); self.routes.insert( id.clone(), - Entry { route, label, bundle_ids, source_count: pids.len(), output_uid: output_uid.map(str::to_string) }, + Entry { + route, + label, + bundle_ids, + source_count: pids.len(), + output_uid: output_uid.map(str::to_string), + last_callbacks: u64::MAX, + }, ); Ok(id) } @@ -211,4 +233,118 @@ impl Engine { } restored } + + /// Self-healing tick. Run on a timer by the daemon's supervisor. Returns a list of + /// human-readable actions taken (empty = nothing to do), so the daemon can log them. + /// + /// Two jobs: + /// 1. **Rebuild dead routes.** A route whose IOProc callback count stopped advancing has + /// lost its aggregate device (coreaudiod restart / driver reinstall / device vanished). + /// The aggregate is clocked independently of whether the tapped app is making sound, + /// so a frozen counter means "dead", not "app is silent" — safe to rebuild. We tear it + /// down and re-establish it with the same params (re-resolving the app's PID). + /// 2. **Resurrect saved routes.** Any route in `saved` whose app is now running but which + /// isn't currently live gets started (handles the app being quit + relaunched). + /// + /// `resolve` maps a bundle id → a currently-running PID (or None). + pub fn reconcile( + &mut self, + saved: &[SavedRoute], + resolve: impl Fn(&str) -> Option<i32>, + ) -> Vec<String> { + let mut actions = Vec::new(); + + // ── 1. Find dead routes (callbacks frozen since last tick) and rebuild them. ── + let mut dead: Vec<(String, SavedRoute)> = Vec::new(); + for (id, e) in self.routes.iter_mut() { + let now = e.route.callbacks(); + let prev = e.last_callbacks; + e.last_callbacks = now; + // Only single-source routes carry enough persisted state to rebuild cleanly. + if route_is_dead(prev, now) && e.source_count == 1 { + if let Some(bundle) = e.bundle_ids.first() { + dead.push(( + id.clone(), + SavedRoute { + bundle_id: bundle.clone(), + output_uid: e.output_uid.clone(), + gain: e.route.gain(), + muted: e.route.muted(), + }, + )); + } + } + } + for (id, saved_route) in dead { + self.routes.remove(&id); // tears down the dead CoreAudio objects via Drop + if let Some(pid) = resolve(&saved_route.bundle_id) { + let label = format!( + "{} → {}", + saved_route.bundle_id, + saved_route.output_uid.as_deref().unwrap_or("Default Output") + ); + match self.start_monitor( + pid, + saved_route.output_uid.as_deref(), + saved_route.gain, + label, + Some(saved_route.bundle_id.clone()), + ) { + Ok(new_id) => { + if saved_route.muted { + self.set_muted(&new_id, true); + } + actions.push(format!("rebuilt dead route for {}", saved_route.bundle_id)); + } + Err(e) => actions.push(format!("failed to rebuild {}: {e}", saved_route.bundle_id)), + } + } else { + actions.push(format!("route for {} lost (app not running)", saved_route.bundle_id)); + } + } + + // ── 2. Resurrect saved routes whose app is running but which aren't live. ── + let live_bundles: std::collections::HashSet<&str> = + self.routes.values().flat_map(|e| e.bundle_ids.iter().map(String::as_str)).collect(); + let to_start: Vec<SavedRoute> = saved + .iter() + .filter(|s| !live_bundles.contains(s.bundle_id.as_str())) + .filter(|s| resolve(&s.bundle_id).is_some()) + .cloned() + .collect(); + for s in to_start { + let n = self.apply_routes(std::slice::from_ref(&s), &resolve); + if n > 0 { + actions.push(format!("started saved route for {} (app came back)", s.bundle_id)); + } + } + + actions + } +} + +#[cfg(test)] +mod tests { + use super::route_is_dead; + + #[test] + fn freshly_created_route_is_never_dead() { + // prev == MAX means "not yet sampled" — must not be flagged dead on its first tick. + assert!(!route_is_dead(u64::MAX, 0)); + assert!(!route_is_dead(u64::MAX, 5000)); + } + + #[test] + fn advancing_counter_is_alive() { + // Healthy aggregate: IOProc keeps firing, count climbs — even if the app is silent. + assert!(!route_is_dead(100, 220)); + assert!(!route_is_dead(0, 1)); + } + + #[test] + fn frozen_counter_is_dead() { + // Aggregate stopped clocking (coreaudiod restart / device gone): count didn't move. + assert!(route_is_dead(500, 500)); + assert!(route_is_dead(0, 0)); + } } diff --git a/crates/hydrad/src/main.rs b/crates/hydrad/src/main.rs @@ -42,15 +42,19 @@ fn main() -> Result<(), Box<dyn Error>> { // Resolve each saved app's bundle id to a currently-running PID; skip apps not running. let config = Config::load(); if !config.routes.is_empty() { - let restored = engine.lock().unwrap().restore(&config, |bundle_id| { - process::list_audio_processes() - .into_iter() - .find(|a| a.bundle_id.as_deref() == Some(bundle_id)) - .map(|a| a.pid) - }); + let restored = engine.lock().unwrap().restore(&config, resolve_bundle_to_pid); eprintln!("restored {restored}/{} saved route(s)", config.routes.len()); } + // Supervisor: the self-healing heart. Every few seconds it reconciles live routes against + // the saved config — rebuilding routes whose aggregate died (coreaudiod restart, driver + // reinstall) and resurrecting saved routes whose app came back. This is what makes Hydra + // "set it and forget it": you create a route once, the daemon keeps it alive forever. + { + let engine = Arc::clone(&engine); + std::thread::spawn(move || supervise(engine)); + } + for conn in listener.incoming() { match conn { Ok(stream) => { @@ -68,6 +72,44 @@ fn main() -> Result<(), Box<dyn Error>> { Ok(()) } +/// Resolve an app bundle id to a currently-running audio-process PID (None if not playing). +pub(crate) fn resolve_bundle_to_pid(bundle_id: &str) -> Option<i32> { + process::list_audio_processes() + .into_iter() + .find(|a| a.bundle_id.as_deref() == Some(bundle_id)) + .map(|a| a.pid) +} + +/// The supervisor loop: reconcile live routes against the saved config on a timer, forever. +/// Holds the engine lock only for the brief reconcile call, never across the sleep. +fn supervise(engine: Arc<Mutex<Engine>>) { + use std::time::Duration; + // 4s: long enough that a healthy route logs several IOProc callbacks between ticks (so a + // frozen counter unambiguously means "aggregate dead"), short enough to feel instant. + const TICK: Duration = Duration::from_secs(4); + loop { + std::thread::sleep(TICK); + let saved = Config::load().routes; + let actions = { + let mut eng = engine.lock().unwrap(); + eng.reconcile(&saved, resolve_bundle_to_pid) + }; + if !actions.is_empty() { + for a in &actions { + eprintln!("[supervisor] {a}"); + } + // Persist the healed state + nudge the SketchyBar widget. + let cfg = engine.lock().unwrap().to_config(); + cfg.save(); + let _ = std::process::Command::new("sketchybar") + .args(["--trigger", "hydra_route_change"]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn(); + } + } +} + /// Probe an existing socket: returns the protocol version if a live daemon answers `Ping`, /// or `None` if nothing's there / it's a stale socket. A short timeout keeps a wedged peer /// from hanging startup. diff --git a/crates/hydrad/src/server.rs b/crates/hydrad/src/server.rs @@ -27,14 +27,7 @@ fn default_recording_path(route_id: &str) -> std::path::PathBuf { dir } -/// Resolve an app bundle id to a currently-running audio-process PID, or None if it isn't -/// playing. Shared by startup-restore and preset-apply. -fn resolve_bundle_to_pid(bundle_id: &str) -> Option<i32> { - process::list_audio_processes() - .into_iter() - .find(|a| a.bundle_id.as_deref() == Some(bundle_id)) - .map(|a| a.pid) -} +use crate::resolve_bundle_to_pid; /// Stable UID for Hydra's single virtual device. The forked driver keeps a fixed UID /// (`Hydra_UID`); only the display name is manifest-driven for now.