shim.rs (15712B)
1 //! Safe Rust wrapper over the Objective-C tap shim (`tap_shim.m`). 2 //! 3 //! [`MonitorRoute`] owns a live monitor: a process tap feeding a private aggregate 4 //! device whose IOProc copies the tapped audio to a hardware output. Dropping it tears 5 //! the CoreAudio objects down in the correct order. 6 7 use std::ffi::{c_void, CString}; 8 use std::os::raw::c_char; 9 use std::path::PathBuf; 10 use std::ptr; 11 use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, Ordering}; 12 use std::sync::Arc; 13 14 use anyhow::{bail, Result}; 15 16 use super::process::process_object_for_pid; 17 use super::AudioObjectID; 18 19 /// Shared realtime parameters. Layout MUST match `HydraParams` in `tap_shim.m` exactly — 20 /// C `_Atomic int`/`unsigned int`/`unsigned long long` map to `AtomicI32`/`AtomicU32`/ 21 /// `AtomicU64` (same size/align), and `float*` to `*mut f32`. A runtime size assertion in 22 /// `MonitorRoute::start_combined` guards against drift. 23 #[repr(C)] 24 struct HydraParams { 25 gain: f32, 26 muted: i32, 27 peak: [f32; 8], 28 running: i32, 29 callbacks: u64, 30 // recording ring (SPSC) 31 rec_on: AtomicI32, 32 rec_buf: *mut f32, 33 rec_cap: u32, 34 rec_channels: u32, 35 rec_write: AtomicU64, 36 rec_read: AtomicU64, 37 rec_overruns: AtomicU64, 38 // tap format, published by the IOProc / start 39 fmt_channels: AtomicU32, 40 fmt_sample_rate: AtomicU32, 41 } 42 43 /// Opaque-to-Rust handle the shim fills in. Layout must match `HydraRoute`. 44 #[repr(C)] 45 struct HydraRoute { 46 tap: AudioObjectID, 47 aggregate: AudioObjectID, 48 ioproc: *mut c_void, 49 params: *mut HydraParams, 50 } 51 52 extern "C" { 53 fn hydra_monitor_start( 54 proc_objs: *const AudioObjectID, 55 n_procs: i32, 56 output_uid: *const c_char, 57 params: *mut HydraParams, 58 out_route: *mut HydraRoute, 59 ) -> i32; 60 fn hydra_monitor_stop(route: *mut HydraRoute); 61 /// `sizeof(HydraParams)` as the C compiler sees it — for the layout assertion. 62 fn hydra_params_size() -> usize; 63 /// Route a hardware INPUT device (e.g. the MacBook mic) to an output. No tap/TCC. 64 fn hydra_input_start( 65 input_uid: *const c_char, 66 output_uid: *const c_char, 67 params: *mut HydraParams, 68 out_route: *mut HydraRoute, 69 ) -> i32; 70 } 71 72 static NEXT_ID: AtomicU64 = AtomicU64::new(1); 73 74 /// A running monitor route. Tearing down on `Drop` is what keeps coreaudiod from wedging. 75 pub struct MonitorRoute { 76 id: String, 77 /// The PIDs whose audio this route taps and mixes together (the "combine" feature). 78 pids: Vec<i32>, 79 raw: HydraRoute, 80 /// Heap-stable params the C IOProc reads; we keep it alive for the route's lifetime. 81 params: Box<HydraParams>, 82 /// Active recording, if any. Always stopped (thread joined) before `params` drops. 83 recorder: Option<Recorder>, 84 } 85 86 /// A live recording: owns the ring storage the IOProc writes into and the drain thread that 87 /// flushes it to a WAV file. The IOProc holds a raw pointer into `ring`/`params`; this struct 88 /// guarantees the drain thread is joined and `rec_on` cleared before either is freed. 89 struct Recorder { 90 path: PathBuf, 91 stop: Arc<AtomicBool>, 92 handle: Option<std::thread::JoinHandle<std::io::Result<u64>>>, 93 /// Kept alive so the raw `rec_buf` pointer the IOProc holds stays valid. 94 _ring: Box<[f32]>, 95 } 96 97 /// Wrap a raw params pointer so it can cross into the drain thread. Safe because the params 98 /// box outlives the thread (joined in `Recorder::stop`) and we only touch atomics + the ring. 99 struct ParamsPtr(*const HydraParams); 100 unsafe impl Send for ParamsPtr {} 101 102 // The CoreAudio handles are just IDs; the only pointer is into our own boxed params, 103 // which outlives any access. Safe to move between threads (the daemon holds it behind a Mutex). 104 unsafe impl Send for MonitorRoute {} 105 106 /// Allocate a fresh, heap-stable `HydraParams` for a new route. Asserts the Rust/C layouts 107 /// match (the struct is shared with the realtime IOProc; a mismatch would corrupt audio-thread 108 /// memory) before any route is created. 109 fn new_params(gain: f32) -> Box<HydraParams> { 110 assert_eq!( 111 std::mem::size_of::<HydraParams>(), 112 unsafe { hydra_params_size() }, 113 "HydraParams layout mismatch between Rust and tap_shim.m" 114 ); 115 Box::new(HydraParams { 116 gain, 117 muted: 0, 118 peak: [0.0; 8], 119 running: 0, 120 callbacks: 0, 121 rec_on: AtomicI32::new(0), 122 rec_buf: ptr::null_mut(), 123 rec_cap: 0, 124 rec_channels: 0, 125 rec_write: AtomicU64::new(0), 126 rec_read: AtomicU64::new(0), 127 rec_overruns: AtomicU64::new(0), 128 fmt_channels: AtomicU32::new(0), 129 fmt_sample_rate: AtomicU32::new(0), 130 }) 131 } 132 133 impl MonitorRoute { 134 /// Tap a single `pid` and monitor it to `output_uid` (or the default output if `None`). 135 pub fn start(pid: i32, output_uid: Option<&str>, gain: f32) -> Result<Self> { 136 Self::start_combined(&[pid], output_uid, gain) 137 } 138 139 /// Tap several PIDs at once, mixing their audio into one route. The CATapDescription is 140 /// built over every process's tap object (`initStereoMixdownOfProcesses:`), so the 141 /// aggregate's input is the sum of all of them. 142 pub fn start_combined(pids: &[i32], output_uid: Option<&str>, gain: f32) -> Result<Self> { 143 if pids.is_empty() { 144 bail!("a route needs at least one source"); 145 } 146 let proc_objs: Vec<AudioObjectID> = 147 pids.iter().map(|&p| process_object_for_pid(p)).collect::<Result<_>>()?; 148 149 let mut params = new_params(gain); 150 let mut raw = HydraRoute { tap: 0, aggregate: 0, ioproc: ptr::null_mut(), params: ptr::null_mut() }; 151 152 let c_uid = output_uid.map(CString::new).transpose()?; 153 let uid_ptr = c_uid.as_ref().map_or(ptr::null(), |c| c.as_ptr()); 154 155 let st = unsafe { 156 hydra_monitor_start( 157 proc_objs.as_ptr(), 158 proc_objs.len() as i32, 159 uid_ptr, 160 params.as_mut() as *mut _, 161 &mut raw, 162 ) 163 }; 164 if st != 0 { 165 bail!("starting monitor for pids {pids:?} failed: {}", os_status(st)); 166 } 167 168 Ok(Self { 169 id: format!("r{}", NEXT_ID.fetch_add(1, Ordering::Relaxed)), 170 pids: pids.to_vec(), 171 raw, 172 params, 173 recorder: None, 174 }) 175 } 176 177 /// Route a hardware input device (by UID — e.g. the MacBook mic) to `output_uid` (or the 178 /// default output). No process tap, no TCC consent: a plain input→output aggregate. 179 pub fn start_input(input_uid: &str, output_uid: Option<&str>, gain: f32) -> Result<Self> { 180 let mut params = new_params(gain); 181 let mut raw = HydraRoute { tap: 0, aggregate: 0, ioproc: ptr::null_mut(), params: ptr::null_mut() }; 182 183 let c_in = CString::new(input_uid)?; 184 let c_out = output_uid.map(CString::new).transpose()?; 185 let out_ptr = c_out.as_ref().map_or(ptr::null(), |c| c.as_ptr()); 186 187 let st = unsafe { hydra_input_start(c_in.as_ptr(), out_ptr, params.as_mut() as *mut _, &mut raw) }; 188 if st != 0 { 189 bail!("starting input route for {input_uid} failed: {}", os_status(st)); 190 } 191 192 Ok(Self { 193 id: format!("r{}", NEXT_ID.fetch_add(1, Ordering::Relaxed)), 194 pids: Vec::new(), // hardware input has no PID 195 raw, 196 params, 197 recorder: None, 198 }) 199 } 200 201 pub fn id(&self) -> &str { 202 &self.id 203 } 204 205 pub fn pids(&self) -> &[i32] { 206 &self.pids 207 } 208 209 pub fn set_gain(&mut self, gain: f32) { 210 unsafe { ptr::write_volatile(&mut self.params.gain, gain) }; 211 } 212 213 pub fn set_muted(&mut self, muted: bool) { 214 unsafe { ptr::write_volatile(&mut self.params.muted, muted as i32) }; 215 } 216 217 pub fn gain(&self) -> f32 { 218 unsafe { ptr::read_volatile(&self.params.gain) } 219 } 220 221 pub fn muted(&self) -> bool { 222 unsafe { ptr::read_volatile(&self.params.muted) != 0 } 223 } 224 225 /// How many times the IOProc has fired (0 ⇒ the aggregate is never being clocked). 226 pub fn callbacks(&self) -> u64 { 227 unsafe { ptr::read_volatile(&self.params.callbacks) } 228 } 229 230 /// Most recent peak written by the IOProc (0.0..~1.0). Stored in `peak[0]`. 231 pub fn peak(&self) -> f32 { 232 unsafe { ptr::read_volatile(&self.params.peak[0]) } 233 } 234 235 pub fn is_recording(&self) -> bool { 236 self.recorder.is_some() 237 } 238 239 /// The file currently being recorded to, if any. 240 pub fn recording_path(&self) -> Option<&std::path::Path> { 241 self.recorder.as_ref().map(|r| r.path.as_path()) 242 } 243 244 /// Begin recording the tapped audio to a WAV file at `path`. The IOProc fills an SPSC 245 /// ring; a drain thread writes the file. Errors if already recording or if the tap 246 /// format isn't known yet (no callback has fired — start playback first). 247 pub fn start_recording(&mut self, path: PathBuf) -> Result<()> { 248 if self.recorder.is_some() { 249 bail!("already recording"); 250 } 251 let channels = self.params.fmt_channels.load(Ordering::Acquire); 252 let sample_rate = self.params.fmt_sample_rate.load(Ordering::Acquire); 253 if channels == 0 || sample_rate == 0 { 254 bail!("tap format not ready yet — start audio in the app first, then record"); 255 } 256 257 // ~4 seconds of ring headroom; the drain thread empties it continuously. 258 let cap = (sample_rate as usize * channels as usize * 4).next_power_of_two(); 259 let mut ring = vec![0.0f32; cap].into_boxed_slice(); 260 261 // Point the IOProc at the ring and arm it. Order matters: set buf/cap/channels and 262 // reset indices BEFORE flipping rec_on, so the audio thread never sees a half-armed ring. 263 self.params.rec_buf = ring.as_mut_ptr(); 264 self.params.rec_cap = cap as u32; 265 self.params.rec_channels = channels; 266 self.params.rec_read.store(0, Ordering::Relaxed); 267 self.params.rec_write.store(0, Ordering::Relaxed); 268 self.params.rec_overruns.store(0, Ordering::Relaxed); 269 self.params.rec_on.store(1, Ordering::Release); 270 271 let stop = Arc::new(AtomicBool::new(false)); 272 let stop_t = Arc::clone(&stop); 273 let pp = ParamsPtr(self.params.as_ref() as *const HydraParams); 274 let out_path = path.clone(); 275 let handle = std::thread::spawn(move || drain_to_wav(pp, stop_t, &out_path, channels, sample_rate)); 276 277 self.recorder = Some(Recorder { path, stop, handle: Some(handle), _ring: ring }); 278 Ok(()) 279 } 280 281 /// Stop recording: disarm the IOProc, join the drain thread, finalize the WAV. Returns 282 /// the number of sample-frames written. 283 pub fn stop_recording(&mut self) -> Result<u64> { 284 let Some(mut rec) = self.recorder.take() else { 285 bail!("not recording"); 286 }; 287 // Disarm the audio thread first so it stops touching the ring, then drain+join. 288 self.params.rec_on.store(0, Ordering::Release); 289 rec.stop.store(true, Ordering::Release); 290 let frames = match rec.handle.take().map(|h| h.join()) { 291 Some(Ok(Ok(floats))) => floats / self.params.rec_channels.max(1) as u64, 292 Some(Ok(Err(e))) => bail!("recording write error: {e}"), 293 Some(Err(_)) => bail!("recording thread panicked"), 294 None => 0, 295 }; 296 // ring (rec._ring) frees here, after the thread is joined and the IOProc disarmed. 297 Ok(frames) 298 } 299 } 300 301 impl Drop for MonitorRoute { 302 fn drop(&mut self) { 303 // Stop recording (joins the drain thread, disarms the IOProc) BEFORE tearing down 304 // CoreAudio and freeing params — otherwise the audio thread could touch freed memory. 305 if self.recorder.is_some() { 306 let _ = self.stop_recording(); 307 } 308 unsafe { hydra_monitor_stop(&mut self.raw) }; 309 // `params` Box is freed here, after the IOProc is guaranteed gone. 310 } 311 } 312 313 /// Drain the SPSC ring to a 32-bit-float WAV until `stop` is set and the ring is empty. 314 /// Runs on its own thread; the only shared state is atomics + the ring buffer. 315 fn drain_to_wav( 316 pp: ParamsPtr, 317 stop: Arc<AtomicBool>, 318 path: &std::path::Path, 319 channels: u32, 320 sample_rate: u32, 321 ) -> std::io::Result<u64> { 322 use std::io::Write; 323 let params = unsafe { &*pp.0 }; 324 let cap = params.rec_cap as usize; 325 326 let file = std::fs::File::create(path)?; 327 let mut w = std::io::BufWriter::new(file); 328 write_wav_header(&mut w, channels, sample_rate)?; 329 330 let mut total_floats: u64 = 0; 331 let mut scratch: Vec<f32> = Vec::with_capacity(cap); 332 loop { 333 let write = params.rec_write.load(Ordering::Acquire); 334 let read = params.rec_read.load(Ordering::Relaxed); 335 let avail = (write - read) as usize; 336 if avail > 0 { 337 scratch.clear(); 338 for i in 0..avail { 339 scratch.push(params.rec_buf_get((read as usize + i) % cap)); 340 } 341 let mut bytes = Vec::with_capacity(avail * 4); 342 for &s in &scratch { 343 bytes.extend_from_slice(&s.to_le_bytes()); 344 } 345 w.write_all(&bytes)?; 346 params.rec_read.store(write, Ordering::Release); 347 total_floats += avail as u64; 348 } else if stop.load(Ordering::Acquire) { 349 break; 350 } else { 351 std::thread::sleep(std::time::Duration::from_millis(20)); 352 } 353 } 354 w.flush()?; 355 finalize_wav(w.into_inner().map_err(|e| e.into_error())?, total_floats)?; 356 Ok(total_floats) 357 } 358 359 impl HydraParams { 360 /// Read one float from the recording ring (drain thread only). 361 fn rec_buf_get(&self, idx: usize) -> f32 { 362 unsafe { *self.rec_buf.add(idx) } 363 } 364 } 365 366 /// Write a canonical 44-byte WAV header for 32-bit IEEE float PCM, with placeholder sizes 367 /// (patched by [`finalize_wav`] once the total is known). 368 fn write_wav_header<W: std::io::Write>(w: &mut W, channels: u32, sample_rate: u32) -> std::io::Result<()> { 369 let ch = channels as u16; 370 let byte_rate = sample_rate * channels * 4; 371 let block_align = (channels * 4) as u16; 372 w.write_all(b"RIFF")?; 373 w.write_all(&0u32.to_le_bytes())?; // RIFF size — patched later 374 w.write_all(b"WAVE")?; 375 w.write_all(b"fmt ")?; 376 w.write_all(&16u32.to_le_bytes())?; // fmt chunk size 377 w.write_all(&3u16.to_le_bytes())?; // format 3 = IEEE float 378 w.write_all(&ch.to_le_bytes())?; 379 w.write_all(&sample_rate.to_le_bytes())?; 380 w.write_all(&byte_rate.to_le_bytes())?; 381 w.write_all(&block_align.to_le_bytes())?; 382 w.write_all(&32u16.to_le_bytes())?; // bits per sample 383 w.write_all(b"data")?; 384 w.write_all(&0u32.to_le_bytes())?; // data size — patched later 385 Ok(()) 386 } 387 388 /// Patch the RIFF + data sizes in a finished WAV now that the float count is known. 389 fn finalize_wav(mut file: std::fs::File, total_floats: u64) -> std::io::Result<()> { 390 use std::io::{Seek, SeekFrom, Write}; 391 let data_bytes = (total_floats * 4) as u32; 392 file.seek(SeekFrom::Start(4))?; 393 file.write_all(&(36 + data_bytes).to_le_bytes())?; // RIFF size = 36 + data 394 file.seek(SeekFrom::Start(40))?; 395 file.write_all(&data_bytes.to_le_bytes())?; // data chunk size 396 file.flush() 397 } 398 399 /// Render an OSStatus as a four-char-code if printable, else a number — CoreAudio errors 400 /// are usually FourCCs (e.g. `!pri` = no permission, `!dev` = bad device). 401 fn os_status(st: i32) -> String { 402 let b = (st as u32).to_be_bytes(); 403 if b.iter().all(|&c| c.is_ascii_graphic() || c == b' ') { 404 format!("'{}{}{}{}' ({st})", b[0] as char, b[1] as char, b[2] as char, b[3] as char) 405 } else { 406 st.to_string() 407 } 408 }