server.rs (12152B)
1 //! Per-connection request handling. Each accepted socket runs on its own thread and 2 //! processes a stream of NDJSON [`Command`]s, replying with a [`Response`] to each. 3 4 use std::error::Error; 5 use std::io::{BufReader, BufWriter}; 6 use std::os::unix::net::UnixStream; 7 use std::sync::{Arc, Mutex}; 8 9 use hydra_core::engine::Engine; 10 use hydra_core::ffi::{hal, process}; 11 use hydra_core::manifest::{Manifest, ManifestDevice}; 12 use hydra_core::presets::PresetStore; 13 use hydra_ipc::{read_msg, write_msg, Command, Response, PROTOCOL_VERSION}; 14 15 type SharedEngine = Arc<Mutex<Engine>>; 16 17 /// Default recording destination: `~/Music/Hydra/hydra-<route>-<unixsecs>.wav`. 18 fn default_recording_path(route_id: &str) -> std::path::PathBuf { 19 let secs = std::time::SystemTime::now() 20 .duration_since(std::time::UNIX_EPOCH) 21 .map(|d| d.as_secs()) 22 .unwrap_or(0); 23 let mut dir = dirs::audio_dir().unwrap_or_else(|| std::path::PathBuf::from("/tmp")); 24 dir.push("Hydra"); 25 let _ = std::fs::create_dir_all(&dir); 26 dir.push(format!("hydra-{route_id}-{secs}.wav")); 27 dir 28 } 29 30 use crate::resolve_bundle_to_pid; 31 32 /// Stable UID for Hydra's single virtual device. The forked driver keeps a fixed UID 33 /// (`Hydra_UID`); only the display name is manifest-driven for now. 34 const HYDRA_DEVICE_UID: &str = "Hydra_UID"; 35 /// The driver's built-in channel count (compile-time). Advisory in the manifest until the 36 /// driver's channel count is itself made dynamic (the larger P3b refactor). 37 const HYDRA_DEVICE_CHANNELS: u32 = 16; 38 39 /// Rename the Hydra virtual device by writing the driver manifest. The new name is picked 40 /// up when coreaudiod next loads the driver (restart), not live. 41 fn set_device_name(name: &str) -> Response { 42 let name = name.trim(); 43 if name.is_empty() { 44 return Response::Error("device name cannot be empty".into()); 45 } 46 let manifest = Manifest::new(vec![ManifestDevice { 47 uid: HYDRA_DEVICE_UID.to_string(), 48 name: name.to_string(), 49 channels: HYDRA_DEVICE_CHANNELS, 50 sample_rates: vec![], 51 }]); 52 match manifest.write() { 53 Ok(()) => Response::Ok, 54 // The manifest dir lives under /Library and is created by install-driver.sh; if the 55 // daemon can't write it, the driver isn't installed yet — say so, don't crash. 56 Err(e) => Response::Error(format!( 57 "could not write {} ({e}); is the driver installed? (run scripts/install-driver.sh)", 58 Manifest::path().display() 59 )), 60 } 61 } 62 63 /// Persist the engine's current routes after any mutation, so they survive a restart. 64 fn persist(engine: &SharedEngine) { 65 engine.lock().unwrap().to_config().save(); 66 } 67 68 pub fn handle(stream: UnixStream, engine: SharedEngine) -> Result<(), Box<dyn Error>> { 69 let mut reader = BufReader::new(stream.try_clone()?); 70 let mut writer = BufWriter::new(stream); 71 72 while let Some(cmd) = read_msg::<_, Command>(&mut reader)? { 73 let resp = dispatch(cmd, &engine, &mut writer)?; 74 write_msg(&mut writer, &resp)?; 75 } 76 77 Ok(()) 78 } 79 80 fn dispatch( 81 cmd: Command, 82 engine: &SharedEngine, 83 writer: &mut BufWriter<UnixStream>, 84 ) -> Result<Response, Box<dyn Error>> { 85 Ok(match cmd { 86 Command::Ping => Response::Pong { version: PROTOCOL_VERSION }, 87 Command::GetState => { 88 let eng = engine.lock().unwrap(); 89 if std::env::var_os("HYDRA_DEBUG").is_some() { 90 for (id, cb, pk) in eng.debug_callbacks() { 91 eprintln!("[diag] route {id}: callbacks={cb} peak={pk:.4}"); 92 } 93 } 94 Response::State(eng.snapshot()) 95 } 96 Command::ListDevices => match hal::list_devices() { 97 Ok(devices) => Response::Devices(devices), 98 Err(e) => Response::Error(format!("device enumeration failed: {e}")), 99 }, 100 Command::ListApps => Response::Apps(process::list_audio_processes()), 101 Command::StartMonitor { pid, output_uid, gain } => { 102 let (label, bundle_id) = route_label(pid, output_uid.as_deref()); 103 let resp = 104 match engine.lock().unwrap().start_monitor(pid, output_uid.as_deref(), gain, label, bundle_id) { 105 Ok(id) => Response::RouteStarted { id }, 106 Err(e) => Response::Error(e.to_string()), 107 }; 108 persist(engine); 109 notify_route_change(); 110 resp 111 } 112 Command::StartCombined { pids, output_uid, gain } => { 113 let (label, bundle_ids) = combined_label(&pids, output_uid.as_deref()); 114 let resp = match engine 115 .lock() 116 .unwrap() 117 .start_combined(&pids, output_uid.as_deref(), gain, label, bundle_ids) 118 { 119 Ok(id) => Response::RouteStarted { id }, 120 Err(e) => Response::Error(e.to_string()), 121 }; 122 persist(engine); 123 notify_route_change(); 124 resp 125 } 126 Command::StartInput { input_uid, output_uid, gain } => { 127 let label = input_label(&input_uid, output_uid.as_deref()); 128 let resp = match engine.lock().unwrap().start_input(&input_uid, output_uid.as_deref(), gain, label) { 129 Ok(id) => Response::RouteStarted { id }, 130 Err(e) => Response::Error(e.to_string()), 131 }; 132 // Hardware-input routes aren't persisted yet (no bundle id); skip persist(). 133 notify_route_change(); 134 resp 135 } 136 Command::StopRoute { id } => { 137 let r = ok_or_missing(engine.lock().unwrap().stop(&id), &id); 138 persist(engine); 139 notify_route_change(); 140 r 141 } 142 Command::SetGain { id, gain } => { 143 let r = ok_or_missing(engine.lock().unwrap().set_gain(&id, gain), &id); 144 persist(engine); 145 r 146 } 147 Command::SetMute { id, muted } => { 148 let r = ok_or_missing(engine.lock().unwrap().set_muted(&id, muted), &id); 149 persist(engine); 150 notify_route_change(); 151 r 152 } 153 Command::SetDeviceName { name } => set_device_name(&name), 154 Command::ListPresets => Response::Presets(PresetStore::load().names()), 155 Command::SavePreset { name } => { 156 let name = name.trim().to_string(); 157 if name.is_empty() { 158 Response::Error("preset name cannot be empty".into()) 159 } else { 160 let routes = engine.lock().unwrap().to_config().routes; 161 let mut store = PresetStore::load(); 162 store.upsert(&name, routes); 163 store.save(); 164 Response::Ok 165 } 166 } 167 Command::ApplyPreset { name } => { 168 let store = PresetStore::load(); 169 match store.get(&name) { 170 Some(preset) => { 171 let total = preset.routes.len(); 172 let routes = preset.routes.clone(); 173 let restored = { 174 let mut eng = engine.lock().unwrap(); 175 eng.clear_all(); 176 eng.apply_routes(&routes, resolve_bundle_to_pid) 177 }; 178 persist(engine); 179 notify_route_change(); 180 Response::PresetApplied { restored, total } 181 } 182 None => Response::Error(format!("no preset named {name:?}")), 183 } 184 } 185 Command::DeletePreset { name } => { 186 let mut store = PresetStore::load(); 187 if store.remove(&name) { 188 store.save(); 189 Response::Ok 190 } else { 191 Response::Error(format!("no preset named {name:?}")) 192 } 193 } 194 Command::StartRecording { id, path } => { 195 let path = path.map(std::path::PathBuf::from).unwrap_or_else(|| default_recording_path(&id)); 196 let resp = match engine.lock().unwrap().start_recording(&id, path.clone()) { 197 Ok(()) => Response::RecordingStarted { path: path.display().to_string() }, 198 Err(e) => Response::Error(e.to_string()), 199 }; 200 notify_route_change(); 201 resp 202 } 203 Command::StopRecording { id } => { 204 let path = engine.lock().unwrap().recording_path(&id); 205 let resp = match engine.lock().unwrap().stop_recording(&id) { 206 Ok(frames) => Response::RecordingStopped { 207 path: path.unwrap_or_default(), 208 frames, 209 }, 210 Err(e) => Response::Error(e.to_string()), 211 }; 212 notify_route_change(); 213 resp 214 } 215 // Server-push subscription lands in P4; acknowledge for now. 216 Command::Subscribe => Response::Ok, 217 Command::Shutdown => { 218 write_msg(writer, &Response::Ok)?; 219 std::process::exit(0); 220 } 221 }) 222 } 223 224 fn ok_or_missing(found: bool, id: &str) -> Response { 225 if found { 226 Response::Ok 227 } else { 228 Response::Error(format!("no such route: {id}")) 229 } 230 } 231 232 /// Best-effort: poke the SketchyBar widget so it refreshes immediately on a route change. 233 /// Silently does nothing if SketchyBar isn't installed. 234 fn notify_route_change() { 235 let _ = std::process::Command::new("sketchybar") 236 .args(["--trigger", "hydra_route_change"]) 237 .stdout(std::process::Stdio::null()) 238 .stderr(std::process::Stdio::null()) 239 .spawn(); 240 } 241 242 /// Build the "app → output" display label for a new route, and resolve the app's bundle id 243 /// (the stable key persisted for restart). Returns `(label, bundle_id)`. 244 fn route_label(pid: i32, output_uid: Option<&str>) -> (String, Option<String>) { 245 let app = process::list_audio_processes().into_iter().find(|a| a.pid == pid); 246 let name = app.as_ref().map(|a| a.name.clone()).unwrap_or_else(|| format!("pid {pid}")); 247 let bundle_id = app.and_then(|a| a.bundle_id); 248 249 let output = match output_uid { 250 Some(uid) => hal::list_devices() 251 .ok() 252 .and_then(|ds| ds.into_iter().find(|d| d.uid == uid).map(|d| d.name)) 253 .unwrap_or_else(|| uid.to_string()), 254 None => "Default Output".to_string(), 255 }; 256 257 (format!("{name} → {output}"), bundle_id) 258 } 259 260 /// Display label for a hardware-input route, e.g. "MacBook Pro Microphone → Hydra". 261 fn input_label(input_uid: &str, output_uid: Option<&str>) -> String { 262 let devs = hal::list_devices().unwrap_or_default(); 263 let name = |uid: &str| devs.iter().find(|d| d.uid == uid).map(|d| d.name.clone()); 264 let src = name(input_uid).unwrap_or_else(|| input_uid.to_string()); 265 let dest = match output_uid { 266 Some(uid) => name(uid).unwrap_or_else(|| uid.to_string()), 267 None => "Default Output".to_string(), 268 }; 269 format!("{src} → {dest}") 270 } 271 272 /// Build the label + bundle-id list for a combined (multi-source) route. 273 /// Label reads like "Safari + Spotify +1 → Hydra". 274 fn combined_label(pids: &[i32], output_uid: Option<&str>) -> (String, Vec<String>) { 275 let apps = process::list_audio_processes(); 276 let names: Vec<String> = pids 277 .iter() 278 .map(|&pid| { 279 apps.iter().find(|a| a.pid == pid).map(|a| a.name.clone()).unwrap_or_else(|| format!("pid {pid}")) 280 }) 281 .collect(); 282 let bundle_ids: Vec<String> = pids 283 .iter() 284 .filter_map(|&pid| apps.iter().find(|a| a.pid == pid).and_then(|a| a.bundle_id.clone())) 285 .collect(); 286 287 let sources = match names.len() { 288 0 => "nothing".to_string(), 289 1 => names[0].clone(), 290 2 => format!("{} + {}", names[0], names[1]), 291 n => format!("{} + {} +{}", names[0], names[1], n - 2), 292 }; 293 294 let output = match output_uid { 295 Some(uid) => hal::list_devices() 296 .ok() 297 .and_then(|ds| ds.into_iter().find(|d| d.uid == uid).map(|d| d.name)) 298 .unwrap_or_else(|| uid.to_string()), 299 None => "Default Output".to_string(), 300 }; 301 302 (format!("{sources} → {output}"), bundle_ids) 303 }