hydra

Terminal replacement for Loopback — virtual audio devices and routing on macOS, from a ratatui TUI.
Log | Files | Refs | README | LICENSE

shim.rs (15712B)


      1 //! Safe Rust wrapper over the Objective-C tap shim (`tap_shim.m`).
      2 //!
      3 //! [`MonitorRoute`] owns a live monitor: a process tap feeding a private aggregate
      4 //! device whose IOProc copies the tapped audio to a hardware output. Dropping it tears
      5 //! the CoreAudio objects down in the correct order.
      6 
      7 use std::ffi::{c_void, CString};
      8 use std::os::raw::c_char;
      9 use std::path::PathBuf;
     10 use std::ptr;
     11 use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, Ordering};
     12 use std::sync::Arc;
     13 
     14 use anyhow::{bail, Result};
     15 
     16 use super::process::process_object_for_pid;
     17 use super::AudioObjectID;
     18 
     19 /// Shared realtime parameters. Layout MUST match `HydraParams` in `tap_shim.m` exactly —
     20 /// C `_Atomic int`/`unsigned int`/`unsigned long long` map to `AtomicI32`/`AtomicU32`/
     21 /// `AtomicU64` (same size/align), and `float*` to `*mut f32`. A runtime size assertion in
     22 /// `MonitorRoute::start_combined` guards against drift.
     23 #[repr(C)]
     24 struct HydraParams {
     25     gain: f32,
     26     muted: i32,
     27     peak: [f32; 8],
     28     running: i32,
     29     callbacks: u64,
     30     // recording ring (SPSC)
     31     rec_on: AtomicI32,
     32     rec_buf: *mut f32,
     33     rec_cap: u32,
     34     rec_channels: u32,
     35     rec_write: AtomicU64,
     36     rec_read: AtomicU64,
     37     rec_overruns: AtomicU64,
     38     // tap format, published by the IOProc / start
     39     fmt_channels: AtomicU32,
     40     fmt_sample_rate: AtomicU32,
     41 }
     42 
     43 /// Opaque-to-Rust handle the shim fills in. Layout must match `HydraRoute`.
     44 #[repr(C)]
     45 struct HydraRoute {
     46     tap: AudioObjectID,
     47     aggregate: AudioObjectID,
     48     ioproc: *mut c_void,
     49     params: *mut HydraParams,
     50 }
     51 
     52 extern "C" {
     53     fn hydra_monitor_start(
     54         proc_objs: *const AudioObjectID,
     55         n_procs: i32,
     56         output_uid: *const c_char,
     57         params: *mut HydraParams,
     58         out_route: *mut HydraRoute,
     59     ) -> i32;
     60     fn hydra_monitor_stop(route: *mut HydraRoute);
     61     /// `sizeof(HydraParams)` as the C compiler sees it — for the layout assertion.
     62     fn hydra_params_size() -> usize;
     63     /// Route a hardware INPUT device (e.g. the MacBook mic) to an output. No tap/TCC.
     64     fn hydra_input_start(
     65         input_uid: *const c_char,
     66         output_uid: *const c_char,
     67         params: *mut HydraParams,
     68         out_route: *mut HydraRoute,
     69     ) -> i32;
     70 }
     71 
     72 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     73 
     74 /// A running monitor route. Tearing down on `Drop` is what keeps coreaudiod from wedging.
     75 pub struct MonitorRoute {
     76     id: String,
     77     /// The PIDs whose audio this route taps and mixes together (the "combine" feature).
     78     pids: Vec<i32>,
     79     raw: HydraRoute,
     80     /// Heap-stable params the C IOProc reads; we keep it alive for the route's lifetime.
     81     params: Box<HydraParams>,
     82     /// Active recording, if any. Always stopped (thread joined) before `params` drops.
     83     recorder: Option<Recorder>,
     84 }
     85 
     86 /// A live recording: owns the ring storage the IOProc writes into and the drain thread that
     87 /// flushes it to a WAV file. The IOProc holds a raw pointer into `ring`/`params`; this struct
     88 /// guarantees the drain thread is joined and `rec_on` cleared before either is freed.
     89 struct Recorder {
     90     path: PathBuf,
     91     stop: Arc<AtomicBool>,
     92     handle: Option<std::thread::JoinHandle<std::io::Result<u64>>>,
     93     /// Kept alive so the raw `rec_buf` pointer the IOProc holds stays valid.
     94     _ring: Box<[f32]>,
     95 }
     96 
     97 /// Wrap a raw params pointer so it can cross into the drain thread. Safe because the params
     98 /// box outlives the thread (joined in `Recorder::stop`) and we only touch atomics + the ring.
     99 struct ParamsPtr(*const HydraParams);
    100 unsafe impl Send for ParamsPtr {}
    101 
    102 // The CoreAudio handles are just IDs; the only pointer is into our own boxed params,
    103 // which outlives any access. Safe to move between threads (the daemon holds it behind a Mutex).
    104 unsafe impl Send for MonitorRoute {}
    105 
    106 /// Allocate a fresh, heap-stable `HydraParams` for a new route. Asserts the Rust/C layouts
    107 /// match (the struct is shared with the realtime IOProc; a mismatch would corrupt audio-thread
    108 /// memory) before any route is created.
    109 fn new_params(gain: f32) -> Box<HydraParams> {
    110     assert_eq!(
    111         std::mem::size_of::<HydraParams>(),
    112         unsafe { hydra_params_size() },
    113         "HydraParams layout mismatch between Rust and tap_shim.m"
    114     );
    115     Box::new(HydraParams {
    116         gain,
    117         muted: 0,
    118         peak: [0.0; 8],
    119         running: 0,
    120         callbacks: 0,
    121         rec_on: AtomicI32::new(0),
    122         rec_buf: ptr::null_mut(),
    123         rec_cap: 0,
    124         rec_channels: 0,
    125         rec_write: AtomicU64::new(0),
    126         rec_read: AtomicU64::new(0),
    127         rec_overruns: AtomicU64::new(0),
    128         fmt_channels: AtomicU32::new(0),
    129         fmt_sample_rate: AtomicU32::new(0),
    130     })
    131 }
    132 
    133 impl MonitorRoute {
    134     /// Tap a single `pid` and monitor it to `output_uid` (or the default output if `None`).
    135     pub fn start(pid: i32, output_uid: Option<&str>, gain: f32) -> Result<Self> {
    136         Self::start_combined(&[pid], output_uid, gain)
    137     }
    138 
    139     /// Tap several PIDs at once, mixing their audio into one route. The CATapDescription is
    140     /// built over every process's tap object (`initStereoMixdownOfProcesses:`), so the
    141     /// aggregate's input is the sum of all of them.
    142     pub fn start_combined(pids: &[i32], output_uid: Option<&str>, gain: f32) -> Result<Self> {
    143         if pids.is_empty() {
    144             bail!("a route needs at least one source");
    145         }
    146         let proc_objs: Vec<AudioObjectID> =
    147             pids.iter().map(|&p| process_object_for_pid(p)).collect::<Result<_>>()?;
    148 
    149         let mut params = new_params(gain);
    150         let mut raw = HydraRoute { tap: 0, aggregate: 0, ioproc: ptr::null_mut(), params: ptr::null_mut() };
    151 
    152         let c_uid = output_uid.map(CString::new).transpose()?;
    153         let uid_ptr = c_uid.as_ref().map_or(ptr::null(), |c| c.as_ptr());
    154 
    155         let st = unsafe {
    156             hydra_monitor_start(
    157                 proc_objs.as_ptr(),
    158                 proc_objs.len() as i32,
    159                 uid_ptr,
    160                 params.as_mut() as *mut _,
    161                 &mut raw,
    162             )
    163         };
    164         if st != 0 {
    165             bail!("starting monitor for pids {pids:?} failed: {}", os_status(st));
    166         }
    167 
    168         Ok(Self {
    169             id: format!("r{}", NEXT_ID.fetch_add(1, Ordering::Relaxed)),
    170             pids: pids.to_vec(),
    171             raw,
    172             params,
    173             recorder: None,
    174         })
    175     }
    176 
    177     /// Route a hardware input device (by UID — e.g. the MacBook mic) to `output_uid` (or the
    178     /// default output). No process tap, no TCC consent: a plain input→output aggregate.
    179     pub fn start_input(input_uid: &str, output_uid: Option<&str>, gain: f32) -> Result<Self> {
    180         let mut params = new_params(gain);
    181         let mut raw = HydraRoute { tap: 0, aggregate: 0, ioproc: ptr::null_mut(), params: ptr::null_mut() };
    182 
    183         let c_in = CString::new(input_uid)?;
    184         let c_out = output_uid.map(CString::new).transpose()?;
    185         let out_ptr = c_out.as_ref().map_or(ptr::null(), |c| c.as_ptr());
    186 
    187         let st = unsafe { hydra_input_start(c_in.as_ptr(), out_ptr, params.as_mut() as *mut _, &mut raw) };
    188         if st != 0 {
    189             bail!("starting input route for {input_uid} failed: {}", os_status(st));
    190         }
    191 
    192         Ok(Self {
    193             id: format!("r{}", NEXT_ID.fetch_add(1, Ordering::Relaxed)),
    194             pids: Vec::new(), // hardware input has no PID
    195             raw,
    196             params,
    197             recorder: None,
    198         })
    199     }
    200 
    201     pub fn id(&self) -> &str {
    202         &self.id
    203     }
    204 
    205     pub fn pids(&self) -> &[i32] {
    206         &self.pids
    207     }
    208 
    209     pub fn set_gain(&mut self, gain: f32) {
    210         unsafe { ptr::write_volatile(&mut self.params.gain, gain) };
    211     }
    212 
    213     pub fn set_muted(&mut self, muted: bool) {
    214         unsafe { ptr::write_volatile(&mut self.params.muted, muted as i32) };
    215     }
    216 
    217     pub fn gain(&self) -> f32 {
    218         unsafe { ptr::read_volatile(&self.params.gain) }
    219     }
    220 
    221     pub fn muted(&self) -> bool {
    222         unsafe { ptr::read_volatile(&self.params.muted) != 0 }
    223     }
    224 
    225     /// How many times the IOProc has fired (0 ⇒ the aggregate is never being clocked).
    226     pub fn callbacks(&self) -> u64 {
    227         unsafe { ptr::read_volatile(&self.params.callbacks) }
    228     }
    229 
    230     /// Most recent peak written by the IOProc (0.0..~1.0). Stored in `peak[0]`.
    231     pub fn peak(&self) -> f32 {
    232         unsafe { ptr::read_volatile(&self.params.peak[0]) }
    233     }
    234 
    235     pub fn is_recording(&self) -> bool {
    236         self.recorder.is_some()
    237     }
    238 
    239     /// The file currently being recorded to, if any.
    240     pub fn recording_path(&self) -> Option<&std::path::Path> {
    241         self.recorder.as_ref().map(|r| r.path.as_path())
    242     }
    243 
    244     /// Begin recording the tapped audio to a WAV file at `path`. The IOProc fills an SPSC
    245     /// ring; a drain thread writes the file. Errors if already recording or if the tap
    246     /// format isn't known yet (no callback has fired — start playback first).
    247     pub fn start_recording(&mut self, path: PathBuf) -> Result<()> {
    248         if self.recorder.is_some() {
    249             bail!("already recording");
    250         }
    251         let channels = self.params.fmt_channels.load(Ordering::Acquire);
    252         let sample_rate = self.params.fmt_sample_rate.load(Ordering::Acquire);
    253         if channels == 0 || sample_rate == 0 {
    254             bail!("tap format not ready yet — start audio in the app first, then record");
    255         }
    256 
    257         // ~4 seconds of ring headroom; the drain thread empties it continuously.
    258         let cap = (sample_rate as usize * channels as usize * 4).next_power_of_two();
    259         let mut ring = vec![0.0f32; cap].into_boxed_slice();
    260 
    261         // Point the IOProc at the ring and arm it. Order matters: set buf/cap/channels and
    262         // reset indices BEFORE flipping rec_on, so the audio thread never sees a half-armed ring.
    263         self.params.rec_buf = ring.as_mut_ptr();
    264         self.params.rec_cap = cap as u32;
    265         self.params.rec_channels = channels;
    266         self.params.rec_read.store(0, Ordering::Relaxed);
    267         self.params.rec_write.store(0, Ordering::Relaxed);
    268         self.params.rec_overruns.store(0, Ordering::Relaxed);
    269         self.params.rec_on.store(1, Ordering::Release);
    270 
    271         let stop = Arc::new(AtomicBool::new(false));
    272         let stop_t = Arc::clone(&stop);
    273         let pp = ParamsPtr(self.params.as_ref() as *const HydraParams);
    274         let out_path = path.clone();
    275         let handle = std::thread::spawn(move || drain_to_wav(pp, stop_t, &out_path, channels, sample_rate));
    276 
    277         self.recorder = Some(Recorder { path, stop, handle: Some(handle), _ring: ring });
    278         Ok(())
    279     }
    280 
    281     /// Stop recording: disarm the IOProc, join the drain thread, finalize the WAV. Returns
    282     /// the number of sample-frames written.
    283     pub fn stop_recording(&mut self) -> Result<u64> {
    284         let Some(mut rec) = self.recorder.take() else {
    285             bail!("not recording");
    286         };
    287         // Disarm the audio thread first so it stops touching the ring, then drain+join.
    288         self.params.rec_on.store(0, Ordering::Release);
    289         rec.stop.store(true, Ordering::Release);
    290         let frames = match rec.handle.take().map(|h| h.join()) {
    291             Some(Ok(Ok(floats))) => floats / self.params.rec_channels.max(1) as u64,
    292             Some(Ok(Err(e))) => bail!("recording write error: {e}"),
    293             Some(Err(_)) => bail!("recording thread panicked"),
    294             None => 0,
    295         };
    296         // ring (rec._ring) frees here, after the thread is joined and the IOProc disarmed.
    297         Ok(frames)
    298     }
    299 }
    300 
    301 impl Drop for MonitorRoute {
    302     fn drop(&mut self) {
    303         // Stop recording (joins the drain thread, disarms the IOProc) BEFORE tearing down
    304         // CoreAudio and freeing params — otherwise the audio thread could touch freed memory.
    305         if self.recorder.is_some() {
    306             let _ = self.stop_recording();
    307         }
    308         unsafe { hydra_monitor_stop(&mut self.raw) };
    309         // `params` Box is freed here, after the IOProc is guaranteed gone.
    310     }
    311 }
    312 
    313 /// Drain the SPSC ring to a 32-bit-float WAV until `stop` is set and the ring is empty.
    314 /// Runs on its own thread; the only shared state is atomics + the ring buffer.
    315 fn drain_to_wav(
    316     pp: ParamsPtr,
    317     stop: Arc<AtomicBool>,
    318     path: &std::path::Path,
    319     channels: u32,
    320     sample_rate: u32,
    321 ) -> std::io::Result<u64> {
    322     use std::io::Write;
    323     let params = unsafe { &*pp.0 };
    324     let cap = params.rec_cap as usize;
    325 
    326     let file = std::fs::File::create(path)?;
    327     let mut w = std::io::BufWriter::new(file);
    328     write_wav_header(&mut w, channels, sample_rate)?;
    329 
    330     let mut total_floats: u64 = 0;
    331     let mut scratch: Vec<f32> = Vec::with_capacity(cap);
    332     loop {
    333         let write = params.rec_write.load(Ordering::Acquire);
    334         let read = params.rec_read.load(Ordering::Relaxed);
    335         let avail = (write - read) as usize;
    336         if avail > 0 {
    337             scratch.clear();
    338             for i in 0..avail {
    339                 scratch.push(params.rec_buf_get((read as usize + i) % cap));
    340             }
    341             let mut bytes = Vec::with_capacity(avail * 4);
    342             for &s in &scratch {
    343                 bytes.extend_from_slice(&s.to_le_bytes());
    344             }
    345             w.write_all(&bytes)?;
    346             params.rec_read.store(write, Ordering::Release);
    347             total_floats += avail as u64;
    348         } else if stop.load(Ordering::Acquire) {
    349             break;
    350         } else {
    351             std::thread::sleep(std::time::Duration::from_millis(20));
    352         }
    353     }
    354     w.flush()?;
    355     finalize_wav(w.into_inner().map_err(|e| e.into_error())?, total_floats)?;
    356     Ok(total_floats)
    357 }
    358 
    359 impl HydraParams {
    360     /// Read one float from the recording ring (drain thread only).
    361     fn rec_buf_get(&self, idx: usize) -> f32 {
    362         unsafe { *self.rec_buf.add(idx) }
    363     }
    364 }
    365 
    366 /// Write a canonical 44-byte WAV header for 32-bit IEEE float PCM, with placeholder sizes
    367 /// (patched by [`finalize_wav`] once the total is known).
    368 fn write_wav_header<W: std::io::Write>(w: &mut W, channels: u32, sample_rate: u32) -> std::io::Result<()> {
    369     let ch = channels as u16;
    370     let byte_rate = sample_rate * channels * 4;
    371     let block_align = (channels * 4) as u16;
    372     w.write_all(b"RIFF")?;
    373     w.write_all(&0u32.to_le_bytes())?; // RIFF size — patched later
    374     w.write_all(b"WAVE")?;
    375     w.write_all(b"fmt ")?;
    376     w.write_all(&16u32.to_le_bytes())?; // fmt chunk size
    377     w.write_all(&3u16.to_le_bytes())?; // format 3 = IEEE float
    378     w.write_all(&ch.to_le_bytes())?;
    379     w.write_all(&sample_rate.to_le_bytes())?;
    380     w.write_all(&byte_rate.to_le_bytes())?;
    381     w.write_all(&block_align.to_le_bytes())?;
    382     w.write_all(&32u16.to_le_bytes())?; // bits per sample
    383     w.write_all(b"data")?;
    384     w.write_all(&0u32.to_le_bytes())?; // data size — patched later
    385     Ok(())
    386 }
    387 
    388 /// Patch the RIFF + data sizes in a finished WAV now that the float count is known.
    389 fn finalize_wav(mut file: std::fs::File, total_floats: u64) -> std::io::Result<()> {
    390     use std::io::{Seek, SeekFrom, Write};
    391     let data_bytes = (total_floats * 4) as u32;
    392     file.seek(SeekFrom::Start(4))?;
    393     file.write_all(&(36 + data_bytes).to_le_bytes())?; // RIFF size = 36 + data
    394     file.seek(SeekFrom::Start(40))?;
    395     file.write_all(&data_bytes.to_le_bytes())?; // data chunk size
    396     file.flush()
    397 }
    398 
    399 /// Render an OSStatus as a four-char-code if printable, else a number — CoreAudio errors
    400 /// are usually FourCCs (e.g. `!pri` = no permission, `!dev` = bad device).
    401 fn os_status(st: i32) -> String {
    402     let b = (st as u32).to_be_bytes();
    403     if b.iter().all(|&c| c.is_ascii_graphic() || c == b' ') {
    404         format!("'{}{}{}{}' ({st})", b[0] as char, b[1] as char, b[2] as char, b[3] as char)
    405     } else {
    406         st.to_string()
    407     }
    408 }