hydra

Terminal replacement for Loopback — virtual audio devices and routing on macOS, from a ratatui TUI.
Log | Files | Refs | README | LICENSE

server.rs (12152B)


      1 //! Per-connection request handling. Each accepted socket runs on its own thread and
      2 //! processes a stream of NDJSON [`Command`]s, replying with a [`Response`] to each.
      3 
      4 use std::error::Error;
      5 use std::io::{BufReader, BufWriter};
      6 use std::os::unix::net::UnixStream;
      7 use std::sync::{Arc, Mutex};
      8 
      9 use hydra_core::engine::Engine;
     10 use hydra_core::ffi::{hal, process};
     11 use hydra_core::manifest::{Manifest, ManifestDevice};
     12 use hydra_core::presets::PresetStore;
     13 use hydra_ipc::{read_msg, write_msg, Command, Response, PROTOCOL_VERSION};
     14 
     15 type SharedEngine = Arc<Mutex<Engine>>;
     16 
     17 /// Default recording destination: `~/Music/Hydra/hydra-<route>-<unixsecs>.wav`.
     18 fn default_recording_path(route_id: &str) -> std::path::PathBuf {
     19     let secs = std::time::SystemTime::now()
     20         .duration_since(std::time::UNIX_EPOCH)
     21         .map(|d| d.as_secs())
     22         .unwrap_or(0);
     23     let mut dir = dirs::audio_dir().unwrap_or_else(|| std::path::PathBuf::from("/tmp"));
     24     dir.push("Hydra");
     25     let _ = std::fs::create_dir_all(&dir);
     26     dir.push(format!("hydra-{route_id}-{secs}.wav"));
     27     dir
     28 }
     29 
     30 use crate::resolve_bundle_to_pid;
     31 
     32 /// Stable UID for Hydra's single virtual device. The forked driver keeps a fixed UID
     33 /// (`Hydra_UID`); only the display name is manifest-driven for now.
     34 const HYDRA_DEVICE_UID: &str = "Hydra_UID";
     35 /// The driver's built-in channel count (compile-time). Advisory in the manifest until the
     36 /// driver's channel count is itself made dynamic (the larger P3b refactor).
     37 const HYDRA_DEVICE_CHANNELS: u32 = 16;
     38 
     39 /// Rename the Hydra virtual device by writing the driver manifest. The new name is picked
     40 /// up when coreaudiod next loads the driver (restart), not live.
     41 fn set_device_name(name: &str) -> Response {
     42     let name = name.trim();
     43     if name.is_empty() {
     44         return Response::Error("device name cannot be empty".into());
     45     }
     46     let manifest = Manifest::new(vec![ManifestDevice {
     47         uid: HYDRA_DEVICE_UID.to_string(),
     48         name: name.to_string(),
     49         channels: HYDRA_DEVICE_CHANNELS,
     50         sample_rates: vec![],
     51     }]);
     52     match manifest.write() {
     53         Ok(()) => Response::Ok,
     54         // The manifest dir lives under /Library and is created by install-driver.sh; if the
     55         // daemon can't write it, the driver isn't installed yet — say so, don't crash.
     56         Err(e) => Response::Error(format!(
     57             "could not write {} ({e}); is the driver installed? (run scripts/install-driver.sh)",
     58             Manifest::path().display()
     59         )),
     60     }
     61 }
     62 
     63 /// Persist the engine's current routes after any mutation, so they survive a restart.
     64 fn persist(engine: &SharedEngine) {
     65     engine.lock().unwrap().to_config().save();
     66 }
     67 
     68 pub fn handle(stream: UnixStream, engine: SharedEngine) -> Result<(), Box<dyn Error>> {
     69     let mut reader = BufReader::new(stream.try_clone()?);
     70     let mut writer = BufWriter::new(stream);
     71 
     72     while let Some(cmd) = read_msg::<_, Command>(&mut reader)? {
     73         let resp = dispatch(cmd, &engine, &mut writer)?;
     74         write_msg(&mut writer, &resp)?;
     75     }
     76 
     77     Ok(())
     78 }
     79 
     80 fn dispatch(
     81     cmd: Command,
     82     engine: &SharedEngine,
     83     writer: &mut BufWriter<UnixStream>,
     84 ) -> Result<Response, Box<dyn Error>> {
     85     Ok(match cmd {
     86         Command::Ping => Response::Pong { version: PROTOCOL_VERSION },
     87         Command::GetState => {
     88             let eng = engine.lock().unwrap();
     89             if std::env::var_os("HYDRA_DEBUG").is_some() {
     90                 for (id, cb, pk) in eng.debug_callbacks() {
     91                     eprintln!("[diag] route {id}: callbacks={cb} peak={pk:.4}");
     92                 }
     93             }
     94             Response::State(eng.snapshot())
     95         }
     96         Command::ListDevices => match hal::list_devices() {
     97             Ok(devices) => Response::Devices(devices),
     98             Err(e) => Response::Error(format!("device enumeration failed: {e}")),
     99         },
    100         Command::ListApps => Response::Apps(process::list_audio_processes()),
    101         Command::StartMonitor { pid, output_uid, gain } => {
    102             let (label, bundle_id) = route_label(pid, output_uid.as_deref());
    103             let resp =
    104                 match engine.lock().unwrap().start_monitor(pid, output_uid.as_deref(), gain, label, bundle_id) {
    105                     Ok(id) => Response::RouteStarted { id },
    106                     Err(e) => Response::Error(e.to_string()),
    107                 };
    108             persist(engine);
    109             notify_route_change();
    110             resp
    111         }
    112         Command::StartCombined { pids, output_uid, gain } => {
    113             let (label, bundle_ids) = combined_label(&pids, output_uid.as_deref());
    114             let resp = match engine
    115                 .lock()
    116                 .unwrap()
    117                 .start_combined(&pids, output_uid.as_deref(), gain, label, bundle_ids)
    118             {
    119                 Ok(id) => Response::RouteStarted { id },
    120                 Err(e) => Response::Error(e.to_string()),
    121             };
    122             persist(engine);
    123             notify_route_change();
    124             resp
    125         }
    126         Command::StartInput { input_uid, output_uid, gain } => {
    127             let label = input_label(&input_uid, output_uid.as_deref());
    128             let resp = match engine.lock().unwrap().start_input(&input_uid, output_uid.as_deref(), gain, label) {
    129                 Ok(id) => Response::RouteStarted { id },
    130                 Err(e) => Response::Error(e.to_string()),
    131             };
    132             // Hardware-input routes aren't persisted yet (no bundle id); skip persist().
    133             notify_route_change();
    134             resp
    135         }
    136         Command::StopRoute { id } => {
    137             let r = ok_or_missing(engine.lock().unwrap().stop(&id), &id);
    138             persist(engine);
    139             notify_route_change();
    140             r
    141         }
    142         Command::SetGain { id, gain } => {
    143             let r = ok_or_missing(engine.lock().unwrap().set_gain(&id, gain), &id);
    144             persist(engine);
    145             r
    146         }
    147         Command::SetMute { id, muted } => {
    148             let r = ok_or_missing(engine.lock().unwrap().set_muted(&id, muted), &id);
    149             persist(engine);
    150             notify_route_change();
    151             r
    152         }
    153         Command::SetDeviceName { name } => set_device_name(&name),
    154         Command::ListPresets => Response::Presets(PresetStore::load().names()),
    155         Command::SavePreset { name } => {
    156             let name = name.trim().to_string();
    157             if name.is_empty() {
    158                 Response::Error("preset name cannot be empty".into())
    159             } else {
    160                 let routes = engine.lock().unwrap().to_config().routes;
    161                 let mut store = PresetStore::load();
    162                 store.upsert(&name, routes);
    163                 store.save();
    164                 Response::Ok
    165             }
    166         }
    167         Command::ApplyPreset { name } => {
    168             let store = PresetStore::load();
    169             match store.get(&name) {
    170                 Some(preset) => {
    171                     let total = preset.routes.len();
    172                     let routes = preset.routes.clone();
    173                     let restored = {
    174                         let mut eng = engine.lock().unwrap();
    175                         eng.clear_all();
    176                         eng.apply_routes(&routes, resolve_bundle_to_pid)
    177                     };
    178                     persist(engine);
    179                     notify_route_change();
    180                     Response::PresetApplied { restored, total }
    181                 }
    182                 None => Response::Error(format!("no preset named {name:?}")),
    183             }
    184         }
    185         Command::DeletePreset { name } => {
    186             let mut store = PresetStore::load();
    187             if store.remove(&name) {
    188                 store.save();
    189                 Response::Ok
    190             } else {
    191                 Response::Error(format!("no preset named {name:?}"))
    192             }
    193         }
    194         Command::StartRecording { id, path } => {
    195             let path = path.map(std::path::PathBuf::from).unwrap_or_else(|| default_recording_path(&id));
    196             let resp = match engine.lock().unwrap().start_recording(&id, path.clone()) {
    197                 Ok(()) => Response::RecordingStarted { path: path.display().to_string() },
    198                 Err(e) => Response::Error(e.to_string()),
    199             };
    200             notify_route_change();
    201             resp
    202         }
    203         Command::StopRecording { id } => {
    204             let path = engine.lock().unwrap().recording_path(&id);
    205             let resp = match engine.lock().unwrap().stop_recording(&id) {
    206                 Ok(frames) => Response::RecordingStopped {
    207                     path: path.unwrap_or_default(),
    208                     frames,
    209                 },
    210                 Err(e) => Response::Error(e.to_string()),
    211             };
    212             notify_route_change();
    213             resp
    214         }
    215         // Server-push subscription lands in P4; acknowledge for now.
    216         Command::Subscribe => Response::Ok,
    217         Command::Shutdown => {
    218             write_msg(writer, &Response::Ok)?;
    219             std::process::exit(0);
    220         }
    221     })
    222 }
    223 
    224 fn ok_or_missing(found: bool, id: &str) -> Response {
    225     if found {
    226         Response::Ok
    227     } else {
    228         Response::Error(format!("no such route: {id}"))
    229     }
    230 }
    231 
    232 /// Best-effort: poke the SketchyBar widget so it refreshes immediately on a route change.
    233 /// Silently does nothing if SketchyBar isn't installed.
    234 fn notify_route_change() {
    235     let _ = std::process::Command::new("sketchybar")
    236         .args(["--trigger", "hydra_route_change"])
    237         .stdout(std::process::Stdio::null())
    238         .stderr(std::process::Stdio::null())
    239         .spawn();
    240 }
    241 
    242 /// Build the "app → output" display label for a new route, and resolve the app's bundle id
    243 /// (the stable key persisted for restart). Returns `(label, bundle_id)`.
    244 fn route_label(pid: i32, output_uid: Option<&str>) -> (String, Option<String>) {
    245     let app = process::list_audio_processes().into_iter().find(|a| a.pid == pid);
    246     let name = app.as_ref().map(|a| a.name.clone()).unwrap_or_else(|| format!("pid {pid}"));
    247     let bundle_id = app.and_then(|a| a.bundle_id);
    248 
    249     let output = match output_uid {
    250         Some(uid) => hal::list_devices()
    251             .ok()
    252             .and_then(|ds| ds.into_iter().find(|d| d.uid == uid).map(|d| d.name))
    253             .unwrap_or_else(|| uid.to_string()),
    254         None => "Default Output".to_string(),
    255     };
    256 
    257     (format!("{name} → {output}"), bundle_id)
    258 }
    259 
    260 /// Display label for a hardware-input route, e.g. "MacBook Pro Microphone → Hydra".
    261 fn input_label(input_uid: &str, output_uid: Option<&str>) -> String {
    262     let devs = hal::list_devices().unwrap_or_default();
    263     let name = |uid: &str| devs.iter().find(|d| d.uid == uid).map(|d| d.name.clone());
    264     let src = name(input_uid).unwrap_or_else(|| input_uid.to_string());
    265     let dest = match output_uid {
    266         Some(uid) => name(uid).unwrap_or_else(|| uid.to_string()),
    267         None => "Default Output".to_string(),
    268     };
    269     format!("{src} → {dest}")
    270 }
    271 
    272 /// Build the label + bundle-id list for a combined (multi-source) route.
    273 /// Label reads like "Safari + Spotify +1 → Hydra".
    274 fn combined_label(pids: &[i32], output_uid: Option<&str>) -> (String, Vec<String>) {
    275     let apps = process::list_audio_processes();
    276     let names: Vec<String> = pids
    277         .iter()
    278         .map(|&pid| {
    279             apps.iter().find(|a| a.pid == pid).map(|a| a.name.clone()).unwrap_or_else(|| format!("pid {pid}"))
    280         })
    281         .collect();
    282     let bundle_ids: Vec<String> = pids
    283         .iter()
    284         .filter_map(|&pid| apps.iter().find(|a| a.pid == pid).and_then(|a| a.bundle_id.clone()))
    285         .collect();
    286 
    287     let sources = match names.len() {
    288         0 => "nothing".to_string(),
    289         1 => names[0].clone(),
    290         2 => format!("{} + {}", names[0], names[1]),
    291         n => format!("{} + {} +{}", names[0], names[1], n - 2),
    292     };
    293 
    294     let output = match output_uid {
    295         Some(uid) => hal::list_devices()
    296             .ok()
    297             .and_then(|ds| ds.into_iter().find(|d| d.uid == uid).map(|d| d.name))
    298             .unwrap_or_else(|| uid.to_string()),
    299         None => "Default Output".to_string(),
    300     };
    301 
    302     (format!("{sources} → {output}"), bundle_ids)
    303 }