engine.rs (14489B)
1 //! The live routing engine: owns every active [`MonitorRoute`] and projects them into 2 //! wire snapshots. The daemon holds one of these behind a `Mutex`. macOS-only. 3 4 use std::collections::HashMap; 5 6 use anyhow::Result; 7 use hydra_ipc::{RouteSummary, StateSnapshot}; 8 9 use crate::config::Config; 10 use crate::config::SavedRoute; 11 use crate::ffi::shim::MonitorRoute; 12 13 /// Decide whether a route is dead from its IOProc callback counts across two supervisor 14 /// ticks. `prev == u64::MAX` means "not yet sampled" (just created) — never dead. Otherwise 15 /// the aggregate is dead iff the counter didn't advance: the IOProc is clocked by the 16 /// aggregate independently of whether the tapped app is making sound, so a frozen counter 17 /// means the aggregate stopped (coreaudiod restart / driver reinstall / device gone), NOT 18 /// that the app went quiet. Pure so it can be unit-tested without CoreAudio. 19 fn route_is_dead(prev: u64, now: u64) -> bool { 20 prev != u64::MAX && now == prev 21 } 22 23 struct Entry { 24 route: MonitorRoute, 25 /// Human-readable "app → output" label for display. 26 label: String, 27 /// Bundle ids of the captured app(s) — the stable key(s) we persist (PIDs don't survive 28 /// restarts). Single-source routes have one; combined routes have several. 29 bundle_ids: Vec<String>, 30 /// Number of sources mixed into this route. 31 source_count: usize, 32 /// Target output device UID, or `None` for the system default. 33 output_uid: Option<String>, 34 /// IOProc callback count at the previous health tick; `u64::MAX` = not yet sampled. 35 /// A route whose count stops advancing has lost its aggregate (coreaudiod restart / 36 /// driver reinstall) and is rebuilt by the supervisor. 37 last_callbacks: u64, 38 } 39 40 #[derive(Default)] 41 pub struct Engine { 42 routes: HashMap<String, Entry>, 43 } 44 45 impl Engine { 46 pub fn new() -> Self { 47 Self::default() 48 } 49 50 /// Start monitoring `pid` to `output_uid` (default output if `None`). `label` is the 51 /// display string the TUI/snapshot shows; `bundle_id` is persisted so the route can be 52 /// re-established (by re-resolving the app) after a daemon restart. Returns the route id. 53 pub fn start_monitor( 54 &mut self, 55 pid: i32, 56 output_uid: Option<&str>, 57 gain: f32, 58 label: String, 59 bundle_id: Option<String>, 60 ) -> Result<String> { 61 let route = MonitorRoute::start(pid, output_uid, gain)?; 62 let id = route.id().to_string(); 63 self.routes.insert( 64 id.clone(), 65 Entry { 66 route, 67 label, 68 bundle_ids: bundle_id.into_iter().collect(), 69 source_count: 1, 70 output_uid: output_uid.map(str::to_string), 71 last_callbacks: u64::MAX, 72 }, 73 ); 74 Ok(id) 75 } 76 77 /// Combine several PIDs into one mixed route. `bundle_ids` are the persistable keys for 78 /// the sources (any that lack a bundle id are simply omitted from persistence). 79 pub fn start_combined( 80 &mut self, 81 pids: &[i32], 82 output_uid: Option<&str>, 83 gain: f32, 84 label: String, 85 bundle_ids: Vec<String>, 86 ) -> Result<String> { 87 let route = MonitorRoute::start_combined(pids, output_uid, gain)?; 88 let id = route.id().to_string(); 89 self.routes.insert( 90 id.clone(), 91 Entry { 92 route, 93 label, 94 bundle_ids, 95 source_count: pids.len(), 96 output_uid: output_uid.map(str::to_string), 97 last_callbacks: u64::MAX, 98 }, 99 ); 100 Ok(id) 101 } 102 103 /// Route a hardware input device (by UID, e.g. the MacBook mic) to an output. No tap/TCC. 104 pub fn start_input( 105 &mut self, 106 input_uid: &str, 107 output_uid: Option<&str>, 108 gain: f32, 109 label: String, 110 ) -> Result<String> { 111 let route = MonitorRoute::start_input(input_uid, output_uid, gain)?; 112 let id = route.id().to_string(); 113 self.routes.insert( 114 id.clone(), 115 Entry { 116 route, 117 label, 118 bundle_ids: Vec::new(), // hardware input has no bundle id 119 source_count: 1, 120 output_uid: output_uid.map(str::to_string), 121 last_callbacks: u64::MAX, 122 }, 123 ); 124 Ok(id) 125 } 126 127 /// Stop and tear down a route. Returns whether it existed. 128 pub fn stop(&mut self, id: &str) -> bool { 129 self.routes.remove(id).is_some() 130 } 131 132 /// Diagnostic: (route id, IOProc callback count, peak) for every live route. 133 pub fn debug_callbacks(&self) -> Vec<(String, u64, f32)> { 134 self.routes 135 .values() 136 .map(|e| (e.route.id().to_string(), e.route.callbacks(), e.route.peak())) 137 .collect() 138 } 139 140 pub fn set_gain(&mut self, id: &str, gain: f32) -> bool { 141 match self.routes.get_mut(id) { 142 Some(e) => { 143 e.route.set_gain(gain); 144 true 145 } 146 None => false, 147 } 148 } 149 150 pub fn set_muted(&mut self, id: &str, muted: bool) -> bool { 151 match self.routes.get_mut(id) { 152 Some(e) => { 153 e.route.set_muted(muted); 154 true 155 } 156 None => false, 157 } 158 } 159 160 /// Start recording route `id` to `path`. Err if no such route or the route refuses 161 /// (already recording / tap format not ready). 162 pub fn start_recording(&mut self, id: &str, path: std::path::PathBuf) -> Result<()> { 163 match self.routes.get_mut(id) { 164 Some(e) => e.route.start_recording(path), 165 None => anyhow::bail!("no such route: {id}"), 166 } 167 } 168 169 /// Stop recording route `id`; returns frames written. 170 pub fn stop_recording(&mut self, id: &str) -> Result<u64> { 171 match self.routes.get_mut(id) { 172 Some(e) => e.route.stop_recording(), 173 None => anyhow::bail!("no such route: {id}"), 174 } 175 } 176 177 /// Whether route `id` is currently recording. 178 pub fn is_recording(&self, id: &str) -> bool { 179 self.routes.get(id).map(|e| e.route.is_recording()).unwrap_or(false) 180 } 181 182 /// The file route `id` is recording to, as a display string (empty if not recording). 183 pub fn recording_path(&self, id: &str) -> Option<String> { 184 self.routes.get(id).and_then(|e| e.route.recording_path()).map(|p| p.display().to_string()) 185 } 186 187 /// Project current routes into a wire snapshot. Meters are sampled live here. 188 pub fn snapshot(&self) -> StateSnapshot { 189 let mut routes: Vec<RouteSummary> = self 190 .routes 191 .values() 192 .map(|e| RouteSummary { 193 id: e.route.id().to_string(), 194 target: e.label.clone(), 195 source_count: e.source_count, 196 active: true, 197 gain: e.route.gain(), 198 muted: e.route.muted(), 199 peak: e.route.peak(), 200 recording: e.route.is_recording(), 201 }) 202 .collect(); 203 routes.sort_by(|a, b| a.id.cmp(&b.id)); 204 205 StateSnapshot { daemon_version: crate::VERSION.to_string(), devices: Vec::new(), routes } 206 } 207 208 /// Project the live routes into a persistable [`Config`]. Routes whose source app has no 209 /// bundle id (anonymous helper processes) are skipped — there's no stable way to restore 210 /// them after a restart. 211 pub fn to_config(&self) -> Config { 212 // Persist only single-source routes for now: a combined route would need its full 213 // bundle-id list re-resolved atomically at restore, which we don't model yet. 214 let mut routes: Vec<SavedRoute> = self 215 .routes 216 .values() 217 .filter(|e| e.source_count == 1) 218 .filter_map(|e| { 219 e.bundle_ids.first().map(|b| SavedRoute { 220 bundle_id: b.clone(), 221 output_uid: e.output_uid.clone(), 222 gain: e.route.gain(), 223 muted: e.route.muted(), 224 }) 225 }) 226 .collect(); 227 routes.sort_by(|a, b| a.bundle_id.cmp(&b.bundle_id)); 228 Config { version: crate::config::CONFIG_VERSION, routes } 229 } 230 231 /// Re-establish saved routes from a [`Config`] (daemon startup). See [`Engine::apply_routes`]. 232 pub fn restore(&mut self, config: &Config, resolve: impl Fn(&str) -> Option<i32>) -> usize { 233 self.apply_routes(&config.routes, resolve) 234 } 235 236 /// Tear down every live route. 237 pub fn clear_all(&mut self) { 238 self.routes.clear(); 239 } 240 241 /// Establish a set of [`SavedRoute`]s, resolving each app's bundle id to a live PID via 242 /// `resolve` (apps not currently running are skipped). Returns how many were established. 243 /// Shared by startup-restore and preset-apply. 244 pub fn apply_routes(&mut self, routes: &[SavedRoute], resolve: impl Fn(&str) -> Option<i32>) -> usize { 245 let mut restored = 0; 246 for saved in routes { 247 let Some(pid) = resolve(&saved.bundle_id) else { continue }; 248 let label = format!("{} → {}", saved.bundle_id, saved.output_uid.as_deref().unwrap_or("Default Output")); 249 if let Ok(id) = 250 self.start_monitor(pid, saved.output_uid.as_deref(), saved.gain, label, Some(saved.bundle_id.clone())) 251 { 252 if saved.muted { 253 self.set_muted(&id, true); 254 } 255 restored += 1; 256 } 257 } 258 restored 259 } 260 261 /// Self-healing tick. Run on a timer by the daemon's supervisor. Returns a list of 262 /// human-readable actions taken (empty = nothing to do), so the daemon can log them. 263 /// 264 /// Two jobs: 265 /// 1. **Rebuild dead routes.** A route whose IOProc callback count stopped advancing has 266 /// lost its aggregate device (coreaudiod restart / driver reinstall / device vanished). 267 /// The aggregate is clocked independently of whether the tapped app is making sound, 268 /// so a frozen counter means "dead", not "app is silent" — safe to rebuild. We tear it 269 /// down and re-establish it with the same params (re-resolving the app's PID). 270 /// 2. **Resurrect saved routes.** Any route in `saved` whose app is now running but which 271 /// isn't currently live gets started (handles the app being quit + relaunched). 272 /// 273 /// `resolve` maps a bundle id → a currently-running PID (or None). 274 pub fn reconcile( 275 &mut self, 276 saved: &[SavedRoute], 277 resolve: impl Fn(&str) -> Option<i32>, 278 ) -> Vec<String> { 279 let mut actions = Vec::new(); 280 281 // ── 1. Find dead routes (callbacks frozen since last tick) and rebuild them. ── 282 let mut dead: Vec<(String, SavedRoute)> = Vec::new(); 283 for (id, e) in self.routes.iter_mut() { 284 let now = e.route.callbacks(); 285 let prev = e.last_callbacks; 286 e.last_callbacks = now; 287 // Only single-source routes carry enough persisted state to rebuild cleanly. 288 if route_is_dead(prev, now) && e.source_count == 1 { 289 if let Some(bundle) = e.bundle_ids.first() { 290 dead.push(( 291 id.clone(), 292 SavedRoute { 293 bundle_id: bundle.clone(), 294 output_uid: e.output_uid.clone(), 295 gain: e.route.gain(), 296 muted: e.route.muted(), 297 }, 298 )); 299 } 300 } 301 } 302 for (id, saved_route) in dead { 303 self.routes.remove(&id); // tears down the dead CoreAudio objects via Drop 304 if let Some(pid) = resolve(&saved_route.bundle_id) { 305 let label = format!( 306 "{} → {}", 307 saved_route.bundle_id, 308 saved_route.output_uid.as_deref().unwrap_or("Default Output") 309 ); 310 match self.start_monitor( 311 pid, 312 saved_route.output_uid.as_deref(), 313 saved_route.gain, 314 label, 315 Some(saved_route.bundle_id.clone()), 316 ) { 317 Ok(new_id) => { 318 if saved_route.muted { 319 self.set_muted(&new_id, true); 320 } 321 actions.push(format!("rebuilt dead route for {}", saved_route.bundle_id)); 322 } 323 Err(e) => actions.push(format!("failed to rebuild {}: {e}", saved_route.bundle_id)), 324 } 325 } else { 326 actions.push(format!("route for {} lost (app not running)", saved_route.bundle_id)); 327 } 328 } 329 330 // ── 2. Resurrect saved routes whose app is running but which aren't live. ── 331 let live_bundles: std::collections::HashSet<&str> = 332 self.routes.values().flat_map(|e| e.bundle_ids.iter().map(String::as_str)).collect(); 333 let to_start: Vec<SavedRoute> = saved 334 .iter() 335 .filter(|s| !live_bundles.contains(s.bundle_id.as_str())) 336 .filter(|s| resolve(&s.bundle_id).is_some()) 337 .cloned() 338 .collect(); 339 for s in to_start { 340 let n = self.apply_routes(std::slice::from_ref(&s), &resolve); 341 if n > 0 { 342 actions.push(format!("started saved route for {} (app came back)", s.bundle_id)); 343 } 344 } 345 346 actions 347 } 348 } 349 350 #[cfg(test)] 351 mod tests { 352 use super::route_is_dead; 353 354 #[test] 355 fn freshly_created_route_is_never_dead() { 356 // prev == MAX means "not yet sampled" — must not be flagged dead on its first tick. 357 assert!(!route_is_dead(u64::MAX, 0)); 358 assert!(!route_is_dead(u64::MAX, 5000)); 359 } 360 361 #[test] 362 fn advancing_counter_is_alive() { 363 // Healthy aggregate: IOProc keeps firing, count climbs — even if the app is silent. 364 assert!(!route_is_dead(100, 220)); 365 assert!(!route_is_dead(0, 1)); 366 } 367 368 #[test] 369 fn frozen_counter_is_dead() { 370 // Aggregate stopped clocking (coreaudiod restart / device gone): count didn't move. 371 assert!(route_is_dead(500, 500)); 372 assert!(route_is_dead(0, 0)); 373 } 374 }