hydra

Terminal replacement for Loopback — virtual audio devices and routing on macOS, from a ratatui TUI.
Log | Files | Refs | README | LICENSE

engine.rs (14489B)


      1 //! The live routing engine: owns every active [`MonitorRoute`] and projects them into
      2 //! wire snapshots. The daemon holds one of these behind a `Mutex`. macOS-only.
      3 
      4 use std::collections::HashMap;
      5 
      6 use anyhow::Result;
      7 use hydra_ipc::{RouteSummary, StateSnapshot};
      8 
      9 use crate::config::Config;
     10 use crate::config::SavedRoute;
     11 use crate::ffi::shim::MonitorRoute;
     12 
     13 /// Decide whether a route is dead from its IOProc callback counts across two supervisor
     14 /// ticks. `prev == u64::MAX` means "not yet sampled" (just created) — never dead. Otherwise
     15 /// the aggregate is dead iff the counter didn't advance: the IOProc is clocked by the
     16 /// aggregate independently of whether the tapped app is making sound, so a frozen counter
     17 /// means the aggregate stopped (coreaudiod restart / driver reinstall / device gone), NOT
     18 /// that the app went quiet. Pure so it can be unit-tested without CoreAudio.
     19 fn route_is_dead(prev: u64, now: u64) -> bool {
     20     prev != u64::MAX && now == prev
     21 }
     22 
     23 struct Entry {
     24     route: MonitorRoute,
     25     /// Human-readable "app → output" label for display.
     26     label: String,
     27     /// Bundle ids of the captured app(s) — the stable key(s) we persist (PIDs don't survive
     28     /// restarts). Single-source routes have one; combined routes have several.
     29     bundle_ids: Vec<String>,
     30     /// Number of sources mixed into this route.
     31     source_count: usize,
     32     /// Target output device UID, or `None` for the system default.
     33     output_uid: Option<String>,
     34     /// IOProc callback count at the previous health tick; `u64::MAX` = not yet sampled.
     35     /// A route whose count stops advancing has lost its aggregate (coreaudiod restart /
     36     /// driver reinstall) and is rebuilt by the supervisor.
     37     last_callbacks: u64,
     38 }
     39 
     40 #[derive(Default)]
     41 pub struct Engine {
     42     routes: HashMap<String, Entry>,
     43 }
     44 
     45 impl Engine {
     46     pub fn new() -> Self {
     47         Self::default()
     48     }
     49 
     50     /// Start monitoring `pid` to `output_uid` (default output if `None`). `label` is the
     51     /// display string the TUI/snapshot shows; `bundle_id` is persisted so the route can be
     52     /// re-established (by re-resolving the app) after a daemon restart. Returns the route id.
     53     pub fn start_monitor(
     54         &mut self,
     55         pid: i32,
     56         output_uid: Option<&str>,
     57         gain: f32,
     58         label: String,
     59         bundle_id: Option<String>,
     60     ) -> Result<String> {
     61         let route = MonitorRoute::start(pid, output_uid, gain)?;
     62         let id = route.id().to_string();
     63         self.routes.insert(
     64             id.clone(),
     65             Entry {
     66                 route,
     67                 label,
     68                 bundle_ids: bundle_id.into_iter().collect(),
     69                 source_count: 1,
     70                 output_uid: output_uid.map(str::to_string),
     71                 last_callbacks: u64::MAX,
     72             },
     73         );
     74         Ok(id)
     75     }
     76 
     77     /// Combine several PIDs into one mixed route. `bundle_ids` are the persistable keys for
     78     /// the sources (any that lack a bundle id are simply omitted from persistence).
     79     pub fn start_combined(
     80         &mut self,
     81         pids: &[i32],
     82         output_uid: Option<&str>,
     83         gain: f32,
     84         label: String,
     85         bundle_ids: Vec<String>,
     86     ) -> Result<String> {
     87         let route = MonitorRoute::start_combined(pids, output_uid, gain)?;
     88         let id = route.id().to_string();
     89         self.routes.insert(
     90             id.clone(),
     91             Entry {
     92                 route,
     93                 label,
     94                 bundle_ids,
     95                 source_count: pids.len(),
     96                 output_uid: output_uid.map(str::to_string),
     97                 last_callbacks: u64::MAX,
     98             },
     99         );
    100         Ok(id)
    101     }
    102 
    103     /// Route a hardware input device (by UID, e.g. the MacBook mic) to an output. No tap/TCC.
    104     pub fn start_input(
    105         &mut self,
    106         input_uid: &str,
    107         output_uid: Option<&str>,
    108         gain: f32,
    109         label: String,
    110     ) -> Result<String> {
    111         let route = MonitorRoute::start_input(input_uid, output_uid, gain)?;
    112         let id = route.id().to_string();
    113         self.routes.insert(
    114             id.clone(),
    115             Entry {
    116                 route,
    117                 label,
    118                 bundle_ids: Vec::new(), // hardware input has no bundle id
    119                 source_count: 1,
    120                 output_uid: output_uid.map(str::to_string),
    121                 last_callbacks: u64::MAX,
    122             },
    123         );
    124         Ok(id)
    125     }
    126 
    127     /// Stop and tear down a route. Returns whether it existed.
    128     pub fn stop(&mut self, id: &str) -> bool {
    129         self.routes.remove(id).is_some()
    130     }
    131 
    132     /// Diagnostic: (route id, IOProc callback count, peak) for every live route.
    133     pub fn debug_callbacks(&self) -> Vec<(String, u64, f32)> {
    134         self.routes
    135             .values()
    136             .map(|e| (e.route.id().to_string(), e.route.callbacks(), e.route.peak()))
    137             .collect()
    138     }
    139 
    140     pub fn set_gain(&mut self, id: &str, gain: f32) -> bool {
    141         match self.routes.get_mut(id) {
    142             Some(e) => {
    143                 e.route.set_gain(gain);
    144                 true
    145             }
    146             None => false,
    147         }
    148     }
    149 
    150     pub fn set_muted(&mut self, id: &str, muted: bool) -> bool {
    151         match self.routes.get_mut(id) {
    152             Some(e) => {
    153                 e.route.set_muted(muted);
    154                 true
    155             }
    156             None => false,
    157         }
    158     }
    159 
    160     /// Start recording route `id` to `path`. Err if no such route or the route refuses
    161     /// (already recording / tap format not ready).
    162     pub fn start_recording(&mut self, id: &str, path: std::path::PathBuf) -> Result<()> {
    163         match self.routes.get_mut(id) {
    164             Some(e) => e.route.start_recording(path),
    165             None => anyhow::bail!("no such route: {id}"),
    166         }
    167     }
    168 
    169     /// Stop recording route `id`; returns frames written.
    170     pub fn stop_recording(&mut self, id: &str) -> Result<u64> {
    171         match self.routes.get_mut(id) {
    172             Some(e) => e.route.stop_recording(),
    173             None => anyhow::bail!("no such route: {id}"),
    174         }
    175     }
    176 
    177     /// Whether route `id` is currently recording.
    178     pub fn is_recording(&self, id: &str) -> bool {
    179         self.routes.get(id).map(|e| e.route.is_recording()).unwrap_or(false)
    180     }
    181 
    182     /// The file route `id` is recording to, as a display string (empty if not recording).
    183     pub fn recording_path(&self, id: &str) -> Option<String> {
    184         self.routes.get(id).and_then(|e| e.route.recording_path()).map(|p| p.display().to_string())
    185     }
    186 
    187     /// Project current routes into a wire snapshot. Meters are sampled live here.
    188     pub fn snapshot(&self) -> StateSnapshot {
    189         let mut routes: Vec<RouteSummary> = self
    190             .routes
    191             .values()
    192             .map(|e| RouteSummary {
    193                 id: e.route.id().to_string(),
    194                 target: e.label.clone(),
    195                 source_count: e.source_count,
    196                 active: true,
    197                 gain: e.route.gain(),
    198                 muted: e.route.muted(),
    199                 peak: e.route.peak(),
    200                 recording: e.route.is_recording(),
    201             })
    202             .collect();
    203         routes.sort_by(|a, b| a.id.cmp(&b.id));
    204 
    205         StateSnapshot { daemon_version: crate::VERSION.to_string(), devices: Vec::new(), routes }
    206     }
    207 
    208     /// Project the live routes into a persistable [`Config`]. Routes whose source app has no
    209     /// bundle id (anonymous helper processes) are skipped — there's no stable way to restore
    210     /// them after a restart.
    211     pub fn to_config(&self) -> Config {
    212         // Persist only single-source routes for now: a combined route would need its full
    213         // bundle-id list re-resolved atomically at restore, which we don't model yet.
    214         let mut routes: Vec<SavedRoute> = self
    215             .routes
    216             .values()
    217             .filter(|e| e.source_count == 1)
    218             .filter_map(|e| {
    219                 e.bundle_ids.first().map(|b| SavedRoute {
    220                     bundle_id: b.clone(),
    221                     output_uid: e.output_uid.clone(),
    222                     gain: e.route.gain(),
    223                     muted: e.route.muted(),
    224                 })
    225             })
    226             .collect();
    227         routes.sort_by(|a, b| a.bundle_id.cmp(&b.bundle_id));
    228         Config { version: crate::config::CONFIG_VERSION, routes }
    229     }
    230 
    231     /// Re-establish saved routes from a [`Config`] (daemon startup). See [`Engine::apply_routes`].
    232     pub fn restore(&mut self, config: &Config, resolve: impl Fn(&str) -> Option<i32>) -> usize {
    233         self.apply_routes(&config.routes, resolve)
    234     }
    235 
    236     /// Tear down every live route.
    237     pub fn clear_all(&mut self) {
    238         self.routes.clear();
    239     }
    240 
    241     /// Establish a set of [`SavedRoute`]s, resolving each app's bundle id to a live PID via
    242     /// `resolve` (apps not currently running are skipped). Returns how many were established.
    243     /// Shared by startup-restore and preset-apply.
    244     pub fn apply_routes(&mut self, routes: &[SavedRoute], resolve: impl Fn(&str) -> Option<i32>) -> usize {
    245         let mut restored = 0;
    246         for saved in routes {
    247             let Some(pid) = resolve(&saved.bundle_id) else { continue };
    248             let label = format!("{} → {}", saved.bundle_id, saved.output_uid.as_deref().unwrap_or("Default Output"));
    249             if let Ok(id) =
    250                 self.start_monitor(pid, saved.output_uid.as_deref(), saved.gain, label, Some(saved.bundle_id.clone()))
    251             {
    252                 if saved.muted {
    253                     self.set_muted(&id, true);
    254                 }
    255                 restored += 1;
    256             }
    257         }
    258         restored
    259     }
    260 
    261     /// Self-healing tick. Run on a timer by the daemon's supervisor. Returns a list of
    262     /// human-readable actions taken (empty = nothing to do), so the daemon can log them.
    263     ///
    264     /// Two jobs:
    265     ///  1. **Rebuild dead routes.** A route whose IOProc callback count stopped advancing has
    266     ///     lost its aggregate device (coreaudiod restart / driver reinstall / device vanished).
    267     ///     The aggregate is clocked independently of whether the tapped app is making sound,
    268     ///     so a frozen counter means "dead", not "app is silent" — safe to rebuild. We tear it
    269     ///     down and re-establish it with the same params (re-resolving the app's PID).
    270     ///  2. **Resurrect saved routes.** Any route in `saved` whose app is now running but which
    271     ///     isn't currently live gets started (handles the app being quit + relaunched).
    272     ///
    273     /// `resolve` maps a bundle id → a currently-running PID (or None).
    274     pub fn reconcile(
    275         &mut self,
    276         saved: &[SavedRoute],
    277         resolve: impl Fn(&str) -> Option<i32>,
    278     ) -> Vec<String> {
    279         let mut actions = Vec::new();
    280 
    281         // ── 1. Find dead routes (callbacks frozen since last tick) and rebuild them. ──
    282         let mut dead: Vec<(String, SavedRoute)> = Vec::new();
    283         for (id, e) in self.routes.iter_mut() {
    284             let now = e.route.callbacks();
    285             let prev = e.last_callbacks;
    286             e.last_callbacks = now;
    287             // Only single-source routes carry enough persisted state to rebuild cleanly.
    288             if route_is_dead(prev, now) && e.source_count == 1 {
    289                 if let Some(bundle) = e.bundle_ids.first() {
    290                     dead.push((
    291                         id.clone(),
    292                         SavedRoute {
    293                             bundle_id: bundle.clone(),
    294                             output_uid: e.output_uid.clone(),
    295                             gain: e.route.gain(),
    296                             muted: e.route.muted(),
    297                         },
    298                     ));
    299                 }
    300             }
    301         }
    302         for (id, saved_route) in dead {
    303             self.routes.remove(&id); // tears down the dead CoreAudio objects via Drop
    304             if let Some(pid) = resolve(&saved_route.bundle_id) {
    305                 let label = format!(
    306                     "{} → {}",
    307                     saved_route.bundle_id,
    308                     saved_route.output_uid.as_deref().unwrap_or("Default Output")
    309                 );
    310                 match self.start_monitor(
    311                     pid,
    312                     saved_route.output_uid.as_deref(),
    313                     saved_route.gain,
    314                     label,
    315                     Some(saved_route.bundle_id.clone()),
    316                 ) {
    317                     Ok(new_id) => {
    318                         if saved_route.muted {
    319                             self.set_muted(&new_id, true);
    320                         }
    321                         actions.push(format!("rebuilt dead route for {}", saved_route.bundle_id));
    322                     }
    323                     Err(e) => actions.push(format!("failed to rebuild {}: {e}", saved_route.bundle_id)),
    324                 }
    325             } else {
    326                 actions.push(format!("route for {} lost (app not running)", saved_route.bundle_id));
    327             }
    328         }
    329 
    330         // ── 2. Resurrect saved routes whose app is running but which aren't live. ──
    331         let live_bundles: std::collections::HashSet<&str> =
    332             self.routes.values().flat_map(|e| e.bundle_ids.iter().map(String::as_str)).collect();
    333         let to_start: Vec<SavedRoute> = saved
    334             .iter()
    335             .filter(|s| !live_bundles.contains(s.bundle_id.as_str()))
    336             .filter(|s| resolve(&s.bundle_id).is_some())
    337             .cloned()
    338             .collect();
    339         for s in to_start {
    340             let n = self.apply_routes(std::slice::from_ref(&s), &resolve);
    341             if n > 0 {
    342                 actions.push(format!("started saved route for {} (app came back)", s.bundle_id));
    343             }
    344         }
    345 
    346         actions
    347     }
    348 }
    349 
    350 #[cfg(test)]
    351 mod tests {
    352     use super::route_is_dead;
    353 
    354     #[test]
    355     fn freshly_created_route_is_never_dead() {
    356         // prev == MAX means "not yet sampled" — must not be flagged dead on its first tick.
    357         assert!(!route_is_dead(u64::MAX, 0));
    358         assert!(!route_is_dead(u64::MAX, 5000));
    359     }
    360 
    361     #[test]
    362     fn advancing_counter_is_alive() {
    363         // Healthy aggregate: IOProc keeps firing, count climbs — even if the app is silent.
    364         assert!(!route_is_dead(100, 220));
    365         assert!(!route_is_dead(0, 1));
    366     }
    367 
    368     #[test]
    369     fn frozen_counter_is_dead() {
    370         // Aggregate stopped clocking (coreaudiod restart / device gone): count didn't move.
    371         assert!(route_is_dead(500, 500));
    372         assert!(route_is_dead(0, 0));
    373     }
    374 }