From f924b4c0cdcc06a3372bc0484f3a18944ea5bacc Mon Sep 17 00:00:00 2001 From: Skyler Lehmkuhl Date: Wed, 11 Feb 2026 19:07:48 -0500 Subject: [PATCH] Stream audio instead of loading the whole thing into memory --- daw-backend/Cargo.toml | 3 + daw-backend/src/audio/disk_reader.rs | 587 ++++++++++++++++++ daw-backend/src/audio/engine.rs | 240 ++++++- daw-backend/src/audio/mod.rs | 3 +- daw-backend/src/audio/pool.rs | 422 ++++++++++--- daw-backend/src/audio/track.rs | 10 +- daw-backend/src/command/types.rs | 34 + daw-backend/src/io/audio_file.rs | 192 ++++++ daw-backend/src/io/mod.rs | 2 +- lightningbeam-ui/Cargo.lock | 1 + .../lightningbeam-editor/src/main.rs | 147 +++-- 11 files changed, 1481 insertions(+), 160 deletions(-) create mode 100644 daw-backend/src/audio/disk_reader.rs diff --git a/daw-backend/Cargo.toml b/daw-backend/Cargo.toml index b5a2bd1..e7bfb33 100644 --- a/daw-backend/Cargo.toml +++ b/daw-backend/Cargo.toml @@ -17,6 +17,9 @@ base64 = "0.22" pathdiff = "0.2" rayon = "1.10" +# Memory-mapped I/O for audio files +memmap2 = "0.9" + # Audio export hound = "3.5" ffmpeg-next = "8.0" # For MP3/AAC encoding diff --git a/daw-backend/src/audio/disk_reader.rs b/daw-backend/src/audio/disk_reader.rs new file mode 100644 index 0000000..c288128 --- /dev/null +++ b/daw-backend/src/audio/disk_reader.rs @@ -0,0 +1,587 @@ +//! Disk reader for streaming audio playback. +//! +//! Provides lock-free read-ahead buffers for audio files that cannot be kept +//! fully decoded in memory. A background thread fills these buffers ahead of +//! the playhead so the audio callback never blocks on I/O or decoding. +//! +//! **InMemory** files bypass the disk reader entirely — their data is already +//! available as `&[f32]`. **Mapped** files (mmap'd WAV/AIFF) also bypass the +//! disk reader for now (OS page cache handles paging). **Compressed** files +//! (MP3, FLAC, OGG, etc.) use a `CompressedReader` that stream-decodes on +//! demand via Symphonia into a `ReadAheadBuffer`. + +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; + +use symphonia::core::audio::SampleBuffer; +use symphonia::core::codecs::DecoderOptions; +use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo}; +use symphonia::core::io::MediaSourceStream; +use symphonia::core::meta::MetadataOptions; +use symphonia::core::probe::Hint; + +/// Read-ahead distance in seconds. +const PREFETCH_SECONDS: f64 = 2.0; + +/// How often the disk reader thread wakes up to check for work (ms). +const POLL_INTERVAL_MS: u64 = 5; + +// --------------------------------------------------------------------------- +// ReadAheadBuffer +// --------------------------------------------------------------------------- + +/// Lock-free read-ahead buffer shared between the disk reader (writer) and the +/// audio callback (reader). +/// +/// # Thread safety +/// +/// This is a **single-producer single-consumer** (SPSC) structure: +/// - **Producer** (disk reader thread): calls `write_samples()` and +/// `advance_start()` to fill and reclaim buffer space. +/// - **Consumer** (audio callback): calls `read_sample()` and `has_range()` +/// to access decoded audio. +/// +/// The producer only writes to indices **beyond** `valid_frames`, while the +/// consumer only reads indices **within** `[start_frame, start_frame + +/// valid_frames)`. Because the two threads always operate on disjoint regions, +/// the sample data itself requires no locking. Atomics with Acquire/Release +/// ordering on `start_frame` and `valid_frames` provide the happens-before +/// relationship that guarantees the consumer sees completed writes. +/// +/// The `UnsafeCell` wrapping the buffer data allows the producer to mutate it +/// through a shared `&self` reference. This is sound because only one thread +/// (the producer) ever writes, and it writes to a region that the consumer +/// cannot yet see (gated by the `valid_frames` atomic). +pub struct ReadAheadBuffer { + /// Interleaved f32 samples stored as a circular buffer. + /// Wrapped in `UnsafeCell` to allow the producer to write through `&self`. + buffer: UnsafeCell>, + /// The absolute frame number of the oldest valid frame in the ring. + start_frame: AtomicU64, + /// Number of valid frames starting from `start_frame`. + valid_frames: AtomicU64, + /// Total capacity in frames. + capacity_frames: usize, + /// Number of audio channels. + channels: u32, + /// Source file sample rate. + sample_rate: u32, +} + +// SAFETY: See the doc comment on ReadAheadBuffer for the full safety argument. +// In short: SPSC access pattern with atomic coordination means no data races. +// The circular design means advance_start never moves data — it only bumps +// the start pointer, so the consumer never sees partially-shifted memory. +unsafe impl Send for ReadAheadBuffer {} +unsafe impl Sync for ReadAheadBuffer {} + +impl std::fmt::Debug for ReadAheadBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReadAheadBuffer") + .field("capacity_frames", &self.capacity_frames) + .field("channels", &self.channels) + .field("sample_rate", &self.sample_rate) + .field("start_frame", &self.start_frame.load(Ordering::Relaxed)) + .field("valid_frames", &self.valid_frames.load(Ordering::Relaxed)) + .finish() + } +} + +impl ReadAheadBuffer { + /// Create a new read-ahead buffer with the given capacity (in seconds). + pub fn new(capacity_seconds: f64, sample_rate: u32, channels: u32) -> Self { + let capacity_frames = (capacity_seconds * sample_rate as f64) as usize; + let buffer_len = capacity_frames * channels as usize; + Self { + buffer: UnsafeCell::new(vec![0.0f32; buffer_len].into_boxed_slice()), + start_frame: AtomicU64::new(0), + valid_frames: AtomicU64::new(0), + capacity_frames, + channels, + sample_rate, + } + } + + /// Map an absolute frame number to a ring-buffer sample index. + #[inline(always)] + fn ring_index(&self, frame: u64, channel: usize) -> usize { + let ring_frame = (frame as usize) % self.capacity_frames; + ring_frame * self.channels as usize + channel + } + + /// Snapshot the current valid range. Call once per audio callback, then + /// pass the returned `(start, end)` to `read_sample` for consistent reads. + #[inline] + pub fn snapshot(&self) -> (u64, u64) { + let start = self.start_frame.load(Ordering::Acquire); + let valid = self.valid_frames.load(Ordering::Acquire); + (start, start + valid) + } + + /// Read a single interleaved sample using a pre-loaded range snapshot. + /// Returns `0.0` if the frame is outside `[snap_start, snap_end)`. + /// Called from the **audio callback** (consumer). + #[inline] + pub fn read_sample(&self, frame: u64, channel: usize, snap_start: u64, snap_end: u64) -> f32 { + if frame < snap_start || frame >= snap_end { + return 0.0; + } + + let idx = self.ring_index(frame, channel); + // SAFETY: We only read indices that the producer has already written + // and published via valid_frames. The circular layout means + // advance_start never moves data, so no torn reads are possible. + let buffer = unsafe { &*self.buffer.get() }; + buffer[idx] + } + + /// Check whether a contiguous range of frames is fully available. + #[inline] + pub fn has_range(&self, start: u64, count: u64) -> bool { + let buf_start = self.start_frame.load(Ordering::Acquire); + let valid = self.valid_frames.load(Ordering::Acquire); + start >= buf_start && start + count <= buf_start + valid + } + + /// Current start frame of the buffer. + #[inline] + pub fn start_frame(&self) -> u64 { + self.start_frame.load(Ordering::Acquire) + } + + /// Number of valid frames currently in the buffer. + #[inline] + pub fn valid_frames_count(&self) -> u64 { + self.valid_frames.load(Ordering::Acquire) + } + + /// Reset the buffer to start at `new_start` with zero valid frames. + /// Called by the **disk reader thread** (producer) after a seek. + pub fn reset(&self, new_start: u64) { + self.valid_frames.store(0, Ordering::Release); + self.start_frame.store(new_start, Ordering::Release); + } + + /// Write interleaved samples into the buffer, extending the valid range. + /// Called by the **disk reader thread** (producer only). + /// Returns the number of frames actually written (may be less than `frames` + /// if the buffer is full). + /// + /// # Safety + /// Must only be called from the single producer thread. + pub fn write_samples(&self, samples: &[f32], frames: usize) -> usize { + let valid = self.valid_frames.load(Ordering::Acquire) as usize; + let remaining_capacity = self.capacity_frames - valid; + let write_frames = frames.min(remaining_capacity); + if write_frames == 0 { + return 0; + } + + let ch = self.channels as usize; + let start = self.start_frame.load(Ordering::Acquire); + let write_start_frame = start as usize + valid; + + // SAFETY: We only write to ring positions beyond the current valid + // range, which the consumer cannot access. Only one producer calls this. + let buffer = unsafe { &mut *self.buffer.get() }; + + // Write with wrap-around: the ring position may cross the buffer end. + let ring_start = (write_start_frame % self.capacity_frames) * ch; + let total_samples = write_frames * ch; + + let buffer_sample_len = self.capacity_frames * ch; + let first_chunk = total_samples.min(buffer_sample_len - ring_start); + + buffer[ring_start..ring_start + first_chunk] + .copy_from_slice(&samples[..first_chunk]); + + if first_chunk < total_samples { + // Wrap around to the beginning of the buffer. + let second_chunk = total_samples - first_chunk; + buffer[..second_chunk] + .copy_from_slice(&samples[first_chunk..first_chunk + second_chunk]); + } + + // Make the new samples visible to the consumer. + self.valid_frames + .store((valid + write_frames) as u64, Ordering::Release); + + write_frames + } + + /// Advance the buffer start, discarding frames behind the playhead. + /// Called by the **disk reader thread** (producer only) to reclaim space. + /// + /// Because this is a circular buffer, advancing the start only updates + /// atomic counters — no data is moved, so the consumer never sees + /// partially-shifted memory. + pub fn advance_start(&self, new_start: u64) { + let old_start = self.start_frame.load(Ordering::Acquire); + if new_start <= old_start { + return; + } + + let advance_frames = (new_start - old_start) as usize; + let valid = self.valid_frames.load(Ordering::Acquire) as usize; + + if advance_frames >= valid { + // All data is stale — just reset. + self.valid_frames.store(0, Ordering::Release); + self.start_frame.store(new_start, Ordering::Release); + return; + } + + let new_valid = valid - advance_frames; + // Store valid_frames first (shrinking the visible range), then + // advance start_frame. The consumer always sees a consistent + // sub-range of valid data. + self.valid_frames + .store(new_valid as u64, Ordering::Release); + self.start_frame.store(new_start, Ordering::Release); + } +} + +// --------------------------------------------------------------------------- +// CompressedReader +// --------------------------------------------------------------------------- + +/// Wraps a Symphonia decoder for streaming a single compressed audio file. +struct CompressedReader { + format_reader: Box, + decoder: Box, + track_id: u32, + /// Current decoder position in frames. + current_frame: u64, + sample_rate: u32, + channels: u32, + #[allow(dead_code)] + total_frames: u64, + /// Temporary decode buffer. + sample_buf: Option>, +} + +impl CompressedReader { + /// Open a compressed audio file and prepare for streaming decode. + fn open(path: &Path) -> Result { + let file = + std::fs::File::open(path).map_err(|e| format!("Failed to open file: {}", e))?; + let mss = MediaSourceStream::new(Box::new(file), Default::default()); + + let mut hint = Hint::new(); + if let Some(ext) = path.extension().and_then(|e| e.to_str()) { + hint.with_extension(ext); + } + + let probed = symphonia::default::get_probe() + .format( + &hint, + mss, + &FormatOptions::default(), + &MetadataOptions::default(), + ) + .map_err(|e| format!("Failed to probe file: {}", e))?; + + let format_reader = probed.format; + + let track = format_reader + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + .ok_or_else(|| "No audio tracks found".to_string())?; + + let track_id = track.id; + let codec_params = &track.codec_params; + let sample_rate = codec_params.sample_rate.unwrap_or(44100); + let channels = codec_params + .channels + .map(|c| c.count()) + .unwrap_or(2) as u32; + let total_frames = codec_params.n_frames.unwrap_or(0); + + let decoder = symphonia::default::get_codecs() + .make(codec_params, &DecoderOptions::default()) + .map_err(|e| format!("Failed to create decoder: {}", e))?; + + Ok(Self { + format_reader, + decoder, + track_id, + current_frame: 0, + sample_rate, + channels, + total_frames, + sample_buf: None, + }) + } + + /// Seek to a specific frame. Returns the actual frame reached (may differ + /// for compressed formats that can only seek to keyframes). + fn seek(&mut self, target_frame: u64) -> Result { + let seek_to = SeekTo::TimeStamp { + ts: target_frame, + track_id: self.track_id, + }; + + let seeked = self + .format_reader + .seek(SeekMode::Coarse, seek_to) + .map_err(|e| format!("Seek failed: {}", e))?; + + let actual_frame = seeked.actual_ts; + self.current_frame = actual_frame; + + // Reset the decoder after seeking. + self.decoder.reset(); + + Ok(actual_frame) + } + + /// Decode the next chunk of audio into `out`. Returns the number of frames + /// decoded. Returns `Ok(0)` at end-of-file. + fn decode_next(&mut self, out: &mut Vec) -> Result { + out.clear(); + + loop { + let packet = match self.format_reader.next_packet() { + Ok(p) => p, + Err(symphonia::core::errors::Error::IoError(ref e)) + if e.kind() == std::io::ErrorKind::UnexpectedEof => + { + return Ok(0); // EOF + } + Err(e) => return Err(format!("Read packet error: {}", e)), + }; + + if packet.track_id() != self.track_id { + continue; + } + + match self.decoder.decode(&packet) { + Ok(decoded) => { + if self.sample_buf.is_none() { + let spec = *decoded.spec(); + let duration = decoded.capacity() as u64; + self.sample_buf = Some(SampleBuffer::new(duration, spec)); + } + + if let Some(ref mut buf) = self.sample_buf { + buf.copy_interleaved_ref(decoded); + let samples = buf.samples(); + out.extend_from_slice(samples); + let frames = samples.len() / self.channels as usize; + self.current_frame += frames as u64; + return Ok(frames); + } + + return Ok(0); + } + Err(symphonia::core::errors::Error::DecodeError(_)) => { + continue; // Skip corrupt packets. + } + Err(e) => return Err(format!("Decode error: {}", e)), + } + } + } +} + +// --------------------------------------------------------------------------- +// DiskReaderCommand +// --------------------------------------------------------------------------- + +/// Commands sent from the engine to the disk reader thread. +pub enum DiskReaderCommand { + /// Start streaming a compressed file. + ActivateFile { + pool_index: usize, + path: PathBuf, + buffer: Arc, + }, + /// Stop streaming a file. + DeactivateFile { pool_index: usize }, + /// The playhead has jumped — refill buffers from the new position. + Seek { frame: u64 }, + /// Shut down the disk reader thread. + Shutdown, +} + +// --------------------------------------------------------------------------- +// DiskReader +// --------------------------------------------------------------------------- + +/// Manages background read-ahead for compressed audio files. +/// +/// The engine creates a `DiskReader` at startup. When a compressed file is +/// imported, it sends an `ActivateFile` command. The disk reader opens a +/// Symphonia decoder and starts filling the file's `ReadAheadBuffer` ahead +/// of the shared playhead. +pub struct DiskReader { + /// Channel to send commands to the background thread. + command_tx: rtrb::Producer, + /// Shared playhead position (frames). The engine updates this atomically. + #[allow(dead_code)] + playhead_frame: Arc, + /// Whether the reader thread is running. + running: Arc, + /// Background thread handle. + thread_handle: Option>, +} + +impl DiskReader { + /// Create a new disk reader with a background thread. + /// + /// `playhead_frame` should be the same `Arc` used by the engine + /// so the disk reader knows where to fill ahead. + pub fn new(playhead_frame: Arc, _sample_rate: u32) -> Self { + let (command_tx, command_rx) = rtrb::RingBuffer::new(64); + let running = Arc::new(AtomicBool::new(true)); + + let thread_running = running.clone(); + let thread_playhead = playhead_frame.clone(); + + let thread_handle = std::thread::Builder::new() + .name("disk-reader".into()) + .spawn(move || { + Self::reader_thread(command_rx, thread_playhead, thread_running); + }) + .expect("Failed to spawn disk reader thread"); + + Self { + command_tx, + playhead_frame, + running, + thread_handle: Some(thread_handle), + } + } + + /// Send a command to the disk reader thread. + pub fn send(&mut self, cmd: DiskReaderCommand) { + let _ = self.command_tx.push(cmd); + } + + /// Create a `ReadAheadBuffer` for a compressed file. + pub fn create_buffer(sample_rate: u32, channels: u32) -> Arc { + Arc::new(ReadAheadBuffer::new( + PREFETCH_SECONDS + 1.0, // extra headroom + sample_rate, + channels, + )) + } + + /// The disk reader background thread. + fn reader_thread( + mut command_rx: rtrb::Consumer, + playhead_frame: Arc, + running: Arc, + ) { + let mut active_files: HashMap)> = + HashMap::new(); + let mut decode_buf = Vec::with_capacity(8192); + + while running.load(Ordering::Relaxed) { + // Process commands. + while let Ok(cmd) = command_rx.pop() { + match cmd { + DiskReaderCommand::ActivateFile { + pool_index, + path, + buffer, + } => match CompressedReader::open(&path) { + Ok(reader) => { + eprintln!("[DiskReader] Activated pool={}, ch={}, sr={}, path={:?}", + pool_index, reader.channels, reader.sample_rate, path); + active_files.insert(pool_index, (reader, buffer)); + } + Err(e) => { + eprintln!( + "[DiskReader] Failed to open compressed file {:?}: {}", + path, e + ); + } + }, + DiskReaderCommand::DeactivateFile { pool_index } => { + active_files.remove(&pool_index); + } + DiskReaderCommand::Seek { frame } => { + for (_, (reader, buffer)) in active_files.iter_mut() { + buffer.reset(frame); + if let Err(e) = reader.seek(frame) { + eprintln!("[DiskReader] Seek error: {}", e); + } + } + } + DiskReaderCommand::Shutdown => { + return; + } + } + } + + let playhead = playhead_frame.load(Ordering::Relaxed); + + // Fill each active file's buffer ahead of the playhead. + for (_pool_index, (reader, buffer)) in active_files.iter_mut() { + let buf_start = buffer.start_frame(); + let buf_valid = buffer.valid_frames_count(); + let buf_end = buf_start + buf_valid; + + // If the playhead has jumped behind or far ahead of the buffer, + // seek the decoder and reset. + if playhead < buf_start || playhead > buf_end + reader.sample_rate as u64 { + buffer.reset(playhead); + let _ = reader.seek(playhead); + continue; + } + + // Advance the buffer start to reclaim space behind the playhead. + // Keep a small lookback for sinc interpolation (~32 frames). + let lookback = 64u64; + let advance_to = playhead.saturating_sub(lookback); + if advance_to > buf_start { + buffer.advance_start(advance_to); + } + + // Calculate how far ahead we need to fill. + let buf_start = buffer.start_frame(); + let buf_valid = buffer.valid_frames_count(); + let buf_end = buf_start + buf_valid; + let prefetch_target = + playhead + (PREFETCH_SECONDS * reader.sample_rate as f64) as u64; + + if buf_end >= prefetch_target { + continue; // Already filled far enough ahead. + } + + // Decode more data into the buffer. + match reader.decode_next(&mut decode_buf) { + Ok(0) => {} // EOF + Ok(frames) => { + let was_empty = buffer.valid_frames_count() == 0; + buffer.write_samples(&decode_buf, frames); + if was_empty { + eprintln!("[DiskReader] pool={}: first fill, {} frames, buf_start={}, valid={}", + _pool_index, frames, buffer.start_frame(), buffer.valid_frames_count()); + } + } + Err(e) => { + eprintln!("[DiskReader] Decode error: {}", e); + } + } + } + + // Sleep briefly to avoid busy-spinning when all buffers are full. + std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS)); + } + } +} + +impl Drop for DiskReader { + fn drop(&mut self) { + self.running.store(false, Ordering::Release); + let _ = self.command_tx.push(DiskReaderCommand::Shutdown); + if let Some(handle) = self.thread_handle.take() { + let _ = handle.join(); + } + } +} diff --git a/daw-backend/src/audio/engine.rs b/daw-backend/src/audio/engine.rs index 0ad8b64..45f83ad 100644 --- a/daw-backend/src/audio/engine.rs +++ b/daw-backend/src/audio/engine.rs @@ -67,6 +67,9 @@ pub struct Engine { // Pre-allocated buffer for recording input samples (avoids allocation per callback) recording_sample_buffer: Vec, + // Disk reader for streaming playback of compressed files + disk_reader: Option, + // Callback timing diagnostics (enabled by DAW_AUDIO_DEBUG=1) debug_audio: bool, callback_count: u64, @@ -95,6 +98,15 @@ impl Engine { // Create channel for background chunk generation let (chunk_generation_tx, chunk_generation_rx) = std::sync::mpsc::channel(); + // Shared atomic playhead for UI reads and disk reader + let playhead_atomic = Arc::new(AtomicU64::new(0)); + + // Initialize disk reader with shared playhead + let disk_reader = crate::audio::disk_reader::DiskReader::new( + Arc::clone(&playhead_atomic), + sample_rate, + ); + Self { project: Project::new(sample_rate), audio_pool: AudioClipPool::new(), @@ -110,7 +122,7 @@ impl Engine { query_response_tx, chunk_generation_rx, chunk_generation_tx, - playhead_atomic: Arc::new(AtomicU64::new(0)), + playhead_atomic, next_midi_clip_id_atomic: Arc::new(AtomicU32::new(0)), frames_since_last_event: 0, event_interval_frames, @@ -123,6 +135,7 @@ impl Engine { midi_input_manager: None, metronome: Metronome::new(sample_rate), recording_sample_buffer: Vec::with_capacity(4096), + disk_reader: Some(disk_reader), debug_audio: std::env::var("DAW_AUDIO_DEBUG").map_or(false, |v| v == "1"), callback_count: 0, timing_worst_total_us: 0, @@ -258,12 +271,37 @@ impl Engine { // Forward chunk generation events from background threads while let Ok(event) = self.chunk_generation_rx.try_recv() { - if self.debug_audio { - if let AudioEvent::WaveformChunksReady { pool_index, detail_level, ref chunks } = event { - eprintln!("[AUDIO THREAD] Received {} chunks for pool {} level {}, forwarding to UI", chunks.len(), pool_index, detail_level); + match event { + AudioEvent::WaveformDecodeComplete { pool_index, samples } => { + // Update pool entry with decoded waveform samples + if let Some(file) = self.audio_pool.get_file_mut(pool_index) { + let total = file.frames; + if let crate::audio::pool::AudioStorage::Compressed { + ref mut decoded_for_waveform, + ref mut decoded_frames, + .. + } = file.storage { + eprintln!("[ENGINE] Waveform decode complete for pool {}: {} samples", pool_index, samples.len()); + *decoded_for_waveform = samples; + *decoded_frames = total; + } + // Notify frontend that waveform data is ready + let _ = self.event_tx.push(AudioEvent::AudioDecodeProgress { + pool_index, + decoded_frames: total, + total_frames: total, + }); + } + } + other => { + if self.debug_audio { + if let AudioEvent::WaveformChunksReady { pool_index, detail_level, ref chunks } = other { + eprintln!("[AUDIO THREAD] Received {} chunks for pool {} level {}, forwarding to UI", chunks.len(), pool_index, detail_level); + } + } + let _ = self.event_tx.push(other); } } - let _ = self.event_tx.push(event); } let t_commands = if self.debug_audio { Some(std::time::Instant::now()) } else { None }; @@ -464,6 +502,10 @@ impl Engine { .store(self.playhead, Ordering::Relaxed); // Stop all MIDI notes when seeking to prevent stuck notes self.project.stop_all_notes(); + // Notify disk reader to refill buffers from new position + if let Some(ref mut dr) = self.disk_reader { + dr.send(crate::audio::disk_reader::DiskReaderCommand::Seek { frame: frames }); + } } Command::SetTrackVolume(track_id, volume) => { if let Some(track) = self.project.get_track_mut(track_id) { @@ -1596,7 +1638,7 @@ impl Engine { if let Some(audio_file) = self.audio_pool.get_file(pool_index) { println!("✅ [ENGINE] Found audio file in pool, queuing work in thread pool"); // Clone necessary data for background thread - let data = audio_file.data.clone(); + let data = audio_file.data().to_vec(); let channels = audio_file.channels; let sample_rate = audio_file.sample_rate; let path = audio_file.path.clone(); @@ -1642,6 +1684,164 @@ impl Engine { eprintln!("❌ [ENGINE] Pool index {} not found for waveform generation", pool_index); } } + + Command::ImportAudio(path) => { + let path_str = path.to_string_lossy().to_string(); + + // Step 1: Read metadata (fast — no decoding) + let metadata = match crate::io::read_metadata(&path) { + Ok(m) => m, + Err(e) => { + eprintln!("[ENGINE] ImportAudio failed to read metadata for {:?}: {}", path, e); + return; + } + }; + + let pool_index; + + eprintln!("[ENGINE] ImportAudio: format={:?}, ch={}, sr={}, n_frames={:?}, duration={:.2}s, path={}", + metadata.format, metadata.channels, metadata.sample_rate, metadata.n_frames, metadata.duration, path_str); + + match metadata.format { + crate::io::AudioFormat::Pcm => { + // WAV/AIFF: memory-map the file for instant availability + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) => { + eprintln!("[ENGINE] ImportAudio failed to open {:?}: {}", path, e); + return; + } + }; + + // SAFETY: The file is opened read-only. The mmap is shared + // immutably. We never write to it. + let mmap = match unsafe { memmap2::Mmap::map(&file) } { + Ok(m) => m, + Err(e) => { + eprintln!("[ENGINE] ImportAudio mmap failed for {:?}: {}", path, e); + return; + } + }; + + // Parse WAV header to find PCM data offset and format + let header = match crate::io::parse_wav_header(&mmap) { + Ok(h) => h, + Err(e) => { + eprintln!("[ENGINE] ImportAudio WAV parse failed for {:?}: {}", path, e); + return; + } + }; + + let audio_file = crate::audio::pool::AudioFile::from_mmap( + path.clone(), + mmap, + header.data_offset, + header.sample_format, + header.channels, + header.sample_rate, + header.total_frames, + ); + + pool_index = self.audio_pool.add_file(audio_file); + } + crate::io::AudioFormat::Compressed => { + let sync_decode = std::env::var("DAW_SYNC_DECODE").is_ok(); + + if sync_decode { + // Diagnostic: full synchronous decode to InMemory (bypasses ring buffer) + eprintln!("[ENGINE] DAW_SYNC_DECODE: doing full decode of {:?}", path); + match crate::io::AudioFile::load(&path) { + Ok(loaded) => { + let ext = path.extension() + .and_then(|e| e.to_str()) + .map(|s| s.to_lowercase()); + let audio_file = crate::audio::pool::AudioFile::with_format( + path.clone(), + loaded.data, + loaded.channels, + loaded.sample_rate, + ext, + ); + pool_index = self.audio_pool.add_file(audio_file); + eprintln!("[ENGINE] DAW_SYNC_DECODE: pool_index={}, frames={}", pool_index, loaded.frames); + } + Err(e) => { + eprintln!("[ENGINE] DAW_SYNC_DECODE failed: {}", e); + return; + } + } + } else { + // Normal path: stream decode via disk reader + let ext = path.extension() + .and_then(|e| e.to_str()) + .map(|s| s.to_lowercase()); + + let total_frames = metadata.n_frames.unwrap_or_else(|| { + (metadata.duration * metadata.sample_rate as f64).ceil() as u64 + }); + + let mut audio_file = crate::audio::pool::AudioFile::from_compressed( + path.clone(), + metadata.channels, + metadata.sample_rate, + total_frames, + ext, + ); + + let buffer = crate::audio::disk_reader::DiskReader::create_buffer( + metadata.sample_rate, + metadata.channels, + ); + audio_file.read_ahead = Some(buffer.clone()); + + pool_index = self.audio_pool.add_file(audio_file); + + eprintln!("[ENGINE] Compressed: total_frames={}, pool_index={}, has_disk_reader={}", + total_frames, pool_index, self.disk_reader.is_some()); + + if let Some(ref mut dr) = self.disk_reader { + dr.send(crate::audio::disk_reader::DiskReaderCommand::ActivateFile { + pool_index, + path: path.clone(), + buffer, + }); + } + + // Spawn background thread to decode full file for waveform display + let bg_tx = self.chunk_generation_tx.clone(); + let bg_path = path.clone(); + let _ = std::thread::Builder::new() + .name(format!("waveform-decode-{}", pool_index)) + .spawn(move || { + eprintln!("[WAVEFORM DECODE] Starting full decode of {:?}", bg_path); + match crate::io::AudioFile::load(&bg_path) { + Ok(loaded) => { + eprintln!("[WAVEFORM DECODE] Complete: {} frames, {} channels", + loaded.frames, loaded.channels); + let _ = bg_tx.send(AudioEvent::WaveformDecodeComplete { + pool_index, + samples: loaded.data, + }); + } + Err(e) => { + eprintln!("[WAVEFORM DECODE] Failed to decode {:?}: {}", bg_path, e); + } + } + }); + } + } + } + + // Emit AudioFileReady event + let _ = self.event_tx.push(AudioEvent::AudioFileReady { + pool_index, + path: path_str, + channels: metadata.channels, + sample_rate: metadata.sample_rate, + duration: metadata.duration, + format: metadata.format, + }); + } } } @@ -1885,11 +2085,22 @@ impl Engine { } Query::GetPoolAudioSamples(pool_index) => { match self.audio_pool.get_file(pool_index) { - Some(file) => QueryResponse::PoolAudioSamples(Ok(( - file.data.clone(), - file.sample_rate, - file.channels, - ))), + Some(file) => { + // For Compressed storage, return decoded_for_waveform if available + let samples = match &file.storage { + crate::audio::pool::AudioStorage::Compressed { + decoded_for_waveform, decoded_frames, .. + } if *decoded_frames > 0 => { + decoded_for_waveform.clone() + } + _ => file.data().to_vec(), + }; + QueryResponse::PoolAudioSamples(Ok(( + samples, + file.sample_rate, + file.channels, + ))) + } None => QueryResponse::PoolAudioSamples(Err(format!("Pool index {} not found", pool_index))), } } @@ -2450,6 +2661,13 @@ impl EngineController { } } + /// Import an audio file asynchronously. The engine will memory-map WAV/AIFF + /// files for instant availability, or set up stream decoding for compressed + /// formats. Listen for `AudioEvent::AudioFileReady` to get the pool index. + pub fn import_audio(&mut self, path: std::path::PathBuf) { + let _ = self.command_tx.push(Command::ImportAudio(path)); + } + /// Add a clip to an audio track pub fn add_audio_clip(&mut self, track_id: TrackId, pool_index: usize, start_time: f64, duration: f64, offset: f64) { let _ = self.command_tx.push(Command::AddAudioClip(track_id, pool_index, start_time, duration, offset)); diff --git a/daw-backend/src/audio/mod.rs b/daw-backend/src/audio/mod.rs index 7a2afde..27a33c1 100644 --- a/daw-backend/src/audio/mod.rs +++ b/daw-backend/src/audio/mod.rs @@ -2,6 +2,7 @@ pub mod automation; pub mod bpm_detector; pub mod buffer_pool; pub mod clip; +pub mod disk_reader; pub mod engine; pub mod export; pub mod metronome; @@ -23,7 +24,7 @@ pub use export::{export_audio, ExportFormat, ExportSettings}; pub use metronome::Metronome; pub use midi::{MidiClip, MidiClipId, MidiClipInstance, MidiClipInstanceId, MidiEvent}; pub use midi_pool::MidiClipPool; -pub use pool::{AudioClipPool, AudioFile as PoolAudioFile, AudioPool}; +pub use pool::{AudioClipPool, AudioFile as PoolAudioFile, AudioPool, AudioStorage, PcmSampleFormat}; pub use project::Project; pub use recording::RecordingState; pub use sample_loader::{load_audio_file, SampleData}; diff --git a/daw-backend/src/audio/pool.rs b/daw-backend/src/audio/pool.rs index 2aff9da..7374da8 100644 --- a/daw-backend/src/audio/pool.rs +++ b/daw-backend/src/audio/pool.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::f32::consts::PI; use serde::{Deserialize, Serialize}; @@ -51,30 +52,66 @@ fn windowed_sinc_interpolate(samples: &[f32], frac: f32) -> f32 { result } +/// PCM sample format for memory-mapped audio files +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PcmSampleFormat { + I16, + I24, + F32, +} + +/// How audio data is stored for a pool entry +#[derive(Debug, Clone)] +pub enum AudioStorage { + /// Fully decoded interleaved f32 samples in memory + InMemory(Vec), + + /// Memory-mapped PCM file (WAV/AIFF) — instant load, OS-managed paging + Mapped { + mmap: Arc, + data_offset: usize, + sample_format: PcmSampleFormat, + bytes_per_sample: usize, + total_frames: u64, + }, + + /// Compressed audio — playback handled by disk reader's stream decoder. + /// `decoded_for_waveform` is progressively filled by a background thread. + Compressed { + decoded_for_waveform: Vec, + decoded_frames: u64, + total_frames: u64, + }, +} + /// Audio file stored in the pool #[derive(Debug, Clone)] pub struct AudioFile { pub path: PathBuf, - pub data: Vec, // Interleaved samples + pub storage: AudioStorage, pub channels: u32, pub sample_rate: u32, pub frames: u64, /// Original file format (mp3, ogg, wav, flac, etc.) /// Used to determine if we should preserve lossy encoding during save pub original_format: Option, + /// Read-ahead buffer for streaming playback (Compressed files). + /// When present, `render_from_file` reads from this buffer instead of `data()`. + pub read_ahead: Option>, } impl AudioFile { - /// Create a new AudioFile + /// Create a new AudioFile with in-memory interleaved f32 data pub fn new(path: PathBuf, data: Vec, channels: u32, sample_rate: u32) -> Self { let frames = (data.len() / channels as usize) as u64; Self { path, - data, + storage: AudioStorage::InMemory(data), channels, sample_rate, frames, original_format: None, + read_ahead: None, } } @@ -83,11 +120,164 @@ impl AudioFile { let frames = (data.len() / channels as usize) as u64; Self { path, - data, + storage: AudioStorage::InMemory(data), channels, sample_rate, frames, original_format, + read_ahead: None, + } + } + + /// Create an AudioFile backed by a memory-mapped WAV/AIFF file + pub fn from_mmap( + path: PathBuf, + mmap: memmap2::Mmap, + data_offset: usize, + sample_format: PcmSampleFormat, + channels: u32, + sample_rate: u32, + total_frames: u64, + ) -> Self { + let bytes_per_sample = match sample_format { + PcmSampleFormat::I16 => 2, + PcmSampleFormat::I24 => 3, + PcmSampleFormat::F32 => 4, + }; + Self { + path, + storage: AudioStorage::Mapped { + mmap: Arc::new(mmap), + data_offset, + sample_format, + bytes_per_sample, + total_frames, + }, + channels, + sample_rate, + frames: total_frames, + original_format: Some("wav".to_string()), + read_ahead: None, + } + } + + /// Create a placeholder AudioFile for a compressed format (playback via disk reader) + pub fn from_compressed( + path: PathBuf, + channels: u32, + sample_rate: u32, + total_frames: u64, + original_format: Option, + ) -> Self { + Self { + path, + storage: AudioStorage::Compressed { + decoded_for_waveform: Vec::new(), + decoded_frames: 0, + total_frames, + }, + channels, + sample_rate, + frames: total_frames, + original_format, + read_ahead: None, + } + } + + /// Get interleaved f32 sample data. + /// + /// - **InMemory**: returns the full slice directly. + /// - **Mapped F32**: reinterprets the mmap'd bytes as `&[f32]` (zero-copy). + /// - **Mapped I16/I24 or Compressed**: returns an empty slice (use + /// `read_samples()` or the disk reader's `ReadAheadBuffer` instead). + pub fn data(&self) -> &[f32] { + match &self.storage { + AudioStorage::InMemory(data) => data, + AudioStorage::Mapped { + mmap, + data_offset, + sample_format, + total_frames, + .. + } if *sample_format == PcmSampleFormat::F32 => { + let byte_slice = &mmap[*data_offset..]; + let ptr = byte_slice.as_ptr(); + // Check 4-byte alignment (required for f32) + if ptr.align_offset(std::mem::align_of::()) == 0 { + let len = (*total_frames as usize) * self.channels as usize; + let available = byte_slice.len() / 4; + let safe_len = len.min(available); + // SAFETY: pointer is aligned, mmap is read-only and outlives + // this borrow, and we clamp to the available byte range. + unsafe { std::slice::from_raw_parts(ptr as *const f32, safe_len) } + } else { + &[] + } + } + _ => &[], + } + } + + /// Read samples for a specific channel into the output buffer. + /// Works for InMemory and Mapped storage. Returns the number of frames read. + pub fn read_samples( + &self, + start_frame: usize, + count: usize, + channel: usize, + out: &mut [f32], + ) -> usize { + let channels = self.channels as usize; + let total_frames = self.frames as usize; + + match &self.storage { + AudioStorage::InMemory(data) => { + let mut written = 0; + for i in 0..count.min(out.len()) { + let frame = start_frame + i; + if frame >= total_frames { break; } + let idx = frame * channels + channel; + out[i] = data[idx]; + written += 1; + } + written + } + AudioStorage::Mapped { mmap, data_offset, sample_format, bytes_per_sample, .. } => { + let mut written = 0; + for i in 0..count.min(out.len()) { + let frame = start_frame + i; + if frame >= total_frames { break; } + let sample_index = frame * channels + channel; + let byte_offset = data_offset + sample_index * bytes_per_sample; + let end = byte_offset + bytes_per_sample; + if end > mmap.len() { break; } + let bytes = &mmap[byte_offset..end]; + out[i] = match sample_format { + PcmSampleFormat::I16 => { + let val = i16::from_le_bytes([bytes[0], bytes[1]]); + val as f32 / 32768.0 + } + PcmSampleFormat::I24 => { + // Sign-extend 24-bit to 32-bit + let val = ((bytes[0] as i32) + | ((bytes[1] as i32) << 8) + | ((bytes[2] as i32) << 16)) + << 8 + >> 8; + val as f32 / 8388608.0 + } + PcmSampleFormat::F32 => { + f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } + }; + written += 1; + } + written + } + AudioStorage::Compressed { .. } => { + // Compressed files are read through the disk reader + 0 + } } } @@ -140,12 +330,13 @@ impl AudioFile { let mut max = f32::MIN; // Scan all samples in this window + let data = self.data(); for frame_idx in peak_start..peak_end { // For multi-channel audio, combine all channels for ch in 0..self.channels as usize { let sample_idx = frame_idx * self.channels as usize + ch; - if sample_idx < self.data.len() { - let sample = self.data[sample_idx]; + if sample_idx < data.len() { + let sample = data[sample_idx]; min = min.min(sample); max = max.max(sample); } @@ -241,6 +432,11 @@ impl AudioClipPool { self.files.get(index) } + /// Get a mutable reference to an audio file by index + pub fn get_file_mut(&mut self, index: usize) -> Option<&mut AudioFile> { + self.files.get_mut(index) + } + /// Get number of files in the pool pub fn file_count(&self) -> usize { self.files.len() @@ -262,104 +458,172 @@ impl AudioClipPool { return 0; }; + let audio_data = audio_file.data(); + let read_ahead = audio_file.read_ahead.as_deref(); + let use_read_ahead = audio_data.is_empty(); let src_channels = audio_file.channels as usize; + + // Nothing to render: no data and no read-ahead buffer + if use_read_ahead && read_ahead.is_none() { + // Log once per pool_index to diagnose silent clips + static LOGGED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(u64::MAX); + let prev = LOGGED.swap(pool_index as u64, std::sync::atomic::Ordering::Relaxed); + if prev != pool_index as u64 { + eprintln!("[RENDER] pool={}: data empty, no read_ahead! storage={:?}, frames={}", + pool_index, std::mem::discriminant(&audio_file.storage), audio_file.frames); + } + return 0; + } + + // Snapshot the read-ahead buffer range once for the entire render call. + // This ensures all sinc interpolation taps within a single callback + // see a consistent range, preventing crackle from concurrent updates. + let (ra_start, ra_end) = if use_read_ahead { + read_ahead.unwrap().snapshot() + } else { + (0, 0) + }; + + // Buffer-miss counter: how many times we wanted a sample the ring + // buffer didn't have (frame in file range but outside buffer range). + let mut buffer_misses: u32 = 0; + + // Read a single interleaved sample by (frame, channel). + // Uses direct slice access for InMemory/Mapped, or the disk reader's + // ReadAheadBuffer for compressed files. + macro_rules! get_sample { + ($frame:expr, $ch:expr) => {{ + if use_read_ahead { + let f = $frame as u64; + let s = read_ahead.unwrap().read_sample(f, $ch, ra_start, ra_end); + if s == 0.0 && (f < ra_start || f >= ra_end) { + buffer_misses += 1; + } + s + } else { + let idx = ($frame) * src_channels + ($ch); + if idx < audio_data.len() { audio_data[idx] } else { 0.0 } + } + }}; + } let dst_channels = engine_channels as usize; let output_frames = output.len() / dst_channels; - // Calculate starting position in source with fractional precision let src_start_position = start_time_seconds * audio_file.sample_rate as f64; - // Sample rate conversion ratio - let rate_ratio = audio_file.sample_rate as f64 / engine_sample_rate as f64; - - // Kernel size for windowed sinc (32 taps = high quality, good performance) - const KERNEL_SIZE: usize = 32; - const HALF_KERNEL: usize = KERNEL_SIZE / 2; - let mut rendered_frames = 0; - // Render frame by frame with windowed sinc interpolation - for output_frame in 0..output_frames { - // Calculate exact fractional position in source - let src_position = src_start_position + (output_frame as f64 * rate_ratio); - let src_frame = src_position.floor() as i32; - let frac = (src_position - src_frame as f64) as f32; + if audio_file.sample_rate == engine_sample_rate { + // Fast path: matching sample rates — direct sample copy, no interpolation + let src_start_frame = src_start_position.floor() as i64; - // Check if we've gone past the end of the audio file - if src_frame < 0 || src_frame as usize >= audio_file.frames as usize { - break; + // Continuity check: detect gaps/overlaps between consecutive callbacks (DAW_AUDIO_DEBUG=1) + if std::env::var("DAW_AUDIO_DEBUG").is_ok() { + use std::sync::atomic::{AtomicI64, Ordering as AO}; + static EXPECTED_NEXT: AtomicI64 = AtomicI64::new(-1); + static DISCONTINUITIES: AtomicI64 = AtomicI64::new(0); + let expected = EXPECTED_NEXT.load(AO::Relaxed); + if expected >= 0 && src_start_frame != expected { + let count = DISCONTINUITIES.fetch_add(1, AO::Relaxed) + 1; + eprintln!("[RENDER CONTINUITY] DISCONTINUITY #{}: expected frame {}, got {} (delta={})", + count, expected, src_start_frame, src_start_frame - expected); + } + EXPECTED_NEXT.store(src_start_frame + output_frames as i64, AO::Relaxed); } - // Interpolate each channel - for dst_ch in 0..dst_channels { - let sample = if src_channels == dst_channels { - // Direct channel mapping - let ch_offset = dst_ch; + for output_frame in 0..output_frames { + let src_frame = src_start_frame + output_frame as i64; + if src_frame < 0 || src_frame as u64 >= audio_file.frames { + break; + } + let sf = src_frame as usize; - // Extract channel samples for interpolation (stack-allocated) - let mut channel_samples = [0.0f32; KERNEL_SIZE]; - for (j, i) in (-(HALF_KERNEL as i32)..(HALF_KERNEL as i32)).enumerate() { - let idx = src_frame + i; - if idx >= 0 && (idx as usize) < audio_file.frames as usize { - let sample_idx = (idx as usize) * src_channels + ch_offset; - channel_samples[j] = audio_file.data[sample_idx]; + for dst_ch in 0..dst_channels { + let sample = if src_channels == dst_channels { + get_sample!(sf, dst_ch) + } else if src_channels == 1 { + get_sample!(sf, 0) + } else if dst_channels == 1 { + let mut sum = 0.0f32; + for src_ch in 0..src_channels { + sum += get_sample!(sf, src_ch); } - } + sum / src_channels as f32 + } else { + get_sample!(sf, dst_ch % src_channels) + }; - windowed_sinc_interpolate(&channel_samples, frac) + output[output_frame * dst_channels + dst_ch] += sample * gain; + } - } else if src_channels == 1 && dst_channels > 1 { - // Mono to stereo - duplicate - let mut channel_samples = [0.0f32; KERNEL_SIZE]; - for (j, i) in (-(HALF_KERNEL as i32)..(HALF_KERNEL as i32)).enumerate() { - let idx = src_frame + i; - if idx >= 0 && (idx as usize) < audio_file.frames as usize { - channel_samples[j] = audio_file.data[idx as usize]; + rendered_frames += 1; + } + } else { + // Sample rate conversion with windowed sinc interpolation + let rate_ratio = audio_file.sample_rate as f64 / engine_sample_rate as f64; + const KERNEL_SIZE: usize = 32; + const HALF_KERNEL: usize = KERNEL_SIZE / 2; + + for output_frame in 0..output_frames { + let src_position = src_start_position + (output_frame as f64 * rate_ratio); + let src_frame = src_position.floor() as i32; + let frac = (src_position - src_frame as f64) as f32; + + if src_frame < 0 || src_frame as usize >= audio_file.frames as usize { + break; + } + + for dst_ch in 0..dst_channels { + let src_ch = if src_channels == dst_channels { + dst_ch + } else if src_channels == 1 { + 0 + } else if dst_channels == 1 { + usize::MAX // sentinel: average all channels below + } else { + dst_ch % src_channels + }; + + let sample = if src_ch == usize::MAX { + let mut sum = 0.0; + for ch in 0..src_channels { + let mut channel_samples = [0.0f32; KERNEL_SIZE]; + for (j, i) in (-(HALF_KERNEL as i32)..(HALF_KERNEL as i32)).enumerate() { + let idx = src_frame + i; + if idx >= 0 && (idx as usize) < audio_file.frames as usize { + channel_samples[j] = get_sample!(idx as usize, ch); + } + } + sum += windowed_sinc_interpolate(&channel_samples, frac); } - } - - windowed_sinc_interpolate(&channel_samples, frac) - - } else if src_channels > 1 && dst_channels == 1 { - // Multi-channel to mono - average all source channels - let mut sum = 0.0; - - for src_ch in 0..src_channels { + sum / src_channels as f32 + } else { let mut channel_samples = [0.0f32; KERNEL_SIZE]; for (j, i) in (-(HALF_KERNEL as i32)..(HALF_KERNEL as i32)).enumerate() { let idx = src_frame + i; if idx >= 0 && (idx as usize) < audio_file.frames as usize { - let sample_idx = (idx as usize) * src_channels + src_ch; - channel_samples[j] = audio_file.data[sample_idx]; + channel_samples[j] = get_sample!(idx as usize, src_ch); } } - sum += windowed_sinc_interpolate(&channel_samples, frac); - } + windowed_sinc_interpolate(&channel_samples, frac) + }; - sum / src_channels as f32 + output[output_frame * dst_channels + dst_ch] += sample * gain; + } - } else { - // Mismatched channels - use modulo mapping - let src_ch = dst_ch % src_channels; - - let mut channel_samples = [0.0f32; KERNEL_SIZE]; - for (j, i) in (-(HALF_KERNEL as i32)..(HALF_KERNEL as i32)).enumerate() { - let idx = src_frame + i; - if idx >= 0 && (idx as usize) < audio_file.frames as usize { - let sample_idx = (idx as usize) * src_channels + src_ch; - channel_samples[j] = audio_file.data[sample_idx]; - } - } - - windowed_sinc_interpolate(&channel_samples, frac) - }; - - // Mix into output with gain - let output_idx = output_frame * dst_channels + dst_ch; - output[output_idx] += sample * gain; + rendered_frames += 1; } + } - rendered_frames += 1; + if use_read_ahead && buffer_misses > 0 { + static MISS_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let total = MISS_COUNT.fetch_add(buffer_misses as u64, std::sync::atomic::Ordering::Relaxed) + buffer_misses as u64; + // Log every 100 misses to avoid flooding + if total % 100 < buffer_misses as u64 { + eprintln!("[RENDER] buffer misses this call: {}, total: {}, snap=[{}..{}], src_start_frame={}", + buffer_misses, total, ra_start, ra_end, + (start_time_seconds * audio_file.sample_rate as f64) as u64); + } } rendered_frames * dst_channels @@ -585,7 +849,7 @@ impl AudioClipPool { // For lossless/PCM or if we couldn't read the original lossy file, // convert the f32 interleaved samples to WAV format bytes let wav_data = Self::encode_wav( - &audio_file.data, + audio_file.data(), audio_file.channels, audio_file.sample_rate ); diff --git a/daw-backend/src/audio/track.rs b/daw-backend/src/audio/track.rs index 1ab2278..94afa85 100644 --- a/daw-backend/src/audio/track.rs +++ b/daw-backend/src/audio/track.rs @@ -795,12 +795,12 @@ impl AudioTrack { } } - // Put the buffer back for reuse next callback - self.clip_render_buffer = clip_buffer; - // Process through the effects graph (this will write to output buffer) self.effects_graph.process(output, &[], playhead_seconds); + // Put the buffer back for reuse next callback + self.clip_render_buffer = clip_buffer; + // Evaluate and apply automation let effective_volume = self.evaluate_automation_at_time(playhead_seconds); @@ -874,8 +874,8 @@ impl AudioTrack { // For now, render in a simpler way - iterate through the timeline range // and use get_content_position for each sample position - let output_start_offset = ((render_start_seconds - playhead_seconds) * samples_per_second) as usize; - let output_end_offset = ((render_end_seconds - playhead_seconds) * samples_per_second) as usize; + let output_start_offset = ((render_start_seconds - playhead_seconds) * samples_per_second + 0.5) as usize; + let output_end_offset = ((render_end_seconds - playhead_seconds) * samples_per_second + 0.5) as usize; if output_end_offset > output.len() || output_start_offset > output.len() { return 0; diff --git a/daw-backend/src/command/types.rs b/daw-backend/src/command/types.rs index 7ab3563..67cee46 100644 --- a/daw-backend/src/command/types.rs +++ b/daw-backend/src/command/types.rs @@ -186,6 +186,13 @@ pub enum Command { chunk_indices: Vec, priority: u8, // 0=Low, 1=Medium, 2=High }, + + // Async audio import + /// Import an audio file asynchronously. The engine probes the file format + /// and either memory-maps it (WAV/AIFF) or sets up stream decode + /// (compressed). Emits `AudioFileReady` when playback-ready and + /// `AudioDecodeProgress` for compressed files as waveform data is decoded. + ImportAudio(std::path::PathBuf), } /// Events sent from audio thread back to UI/control thread @@ -253,6 +260,33 @@ pub enum AudioEvent { detail_level: u8, chunks: Vec<(u32, (f64, f64), Vec)>, }, + + /// An audio file has been imported and is ready for playback. + /// For WAV/AIFF: the file is memory-mapped. For compressed: the disk + /// reader is stream-decoding ahead of the playhead. + AudioFileReady { + pool_index: usize, + path: String, + channels: u32, + sample_rate: u32, + duration: f64, + format: crate::io::audio_file::AudioFormat, + }, + + /// Progressive decode progress for a compressed audio file's waveform data. + /// The UI can use this to update waveform display incrementally. + AudioDecodeProgress { + pool_index: usize, + decoded_frames: u64, + total_frames: u64, + }, + + /// Background waveform decode completed for a compressed audio file. + /// Internal event — consumed by the engine to update the pool, not forwarded to UI. + WaveformDecodeComplete { + pool_index: usize, + samples: Vec, + }, } /// Synchronous queries sent from UI thread to audio thread diff --git a/daw-backend/src/io/audio_file.rs b/daw-backend/src/io/audio_file.rs index fff2ed1..6a2d841 100644 --- a/daw-backend/src/io/audio_file.rs +++ b/daw-backend/src/io/audio_file.rs @@ -31,6 +31,25 @@ pub struct WaveformChunk { pub peaks: Vec, // Variable length based on level } +/// Whether an audio file is uncompressed (WAV/AIFF — can be memory-mapped) or compressed +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AudioFormat { + /// Uncompressed PCM (WAV, AIFF) — suitable for memory mapping + Pcm, + /// Compressed (MP3, FLAC, OGG, AAC, etc.) — requires decoding + Compressed, +} + +/// Audio file metadata obtained without decoding +#[derive(Debug, Clone)] +pub struct AudioMetadata { + pub channels: u32, + pub sample_rate: u32, + pub duration: f64, + pub n_frames: Option, + pub format: AudioFormat, +} + pub struct AudioFile { pub data: Vec, pub channels: u32, @@ -38,6 +57,179 @@ pub struct AudioFile { pub frames: u64, } +/// Read only metadata from an audio file without decoding any audio packets. +/// This is fast (sub-millisecond) and suitable for calling on the UI thread. +pub fn read_metadata>(path: P) -> Result { + let path = path.as_ref(); + + let file = std::fs::File::open(path) + .map_err(|e| format!("Failed to open file: {}", e))?; + + let mss = MediaSourceStream::new(Box::new(file), Default::default()); + + let mut hint = Hint::new(); + let ext = path.extension().and_then(|e| e.to_str()).map(|s| s.to_lowercase()); + if let Some(ref ext_str) = ext { + hint.with_extension(ext_str); + } + + let probed = symphonia::default::get_probe() + .format(&hint, mss, &FormatOptions::default(), &MetadataOptions::default()) + .map_err(|e| format!("Failed to probe file: {}", e))?; + + let format = probed.format; + + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + .ok_or_else(|| "No audio tracks found".to_string())?; + + let codec_params = &track.codec_params; + let channels = codec_params.channels + .ok_or_else(|| "Channel count not specified".to_string())? + .count() as u32; + let sample_rate = codec_params.sample_rate + .ok_or_else(|| "Sample rate not specified".to_string())?; + let n_frames = codec_params.n_frames; + + // Determine duration from frame count or time base + let duration = if let Some(frames) = n_frames { + frames as f64 / sample_rate as f64 + } else if let Some(tb) = codec_params.time_base { + if let Some(dur) = codec_params.n_frames { + tb.calc_time(dur).seconds as f64 + tb.calc_time(dur).frac + } else { + 0.0 + } + } else { + 0.0 + }; + + // Determine if this is a PCM format (WAV/AIFF) or compressed + let audio_format = match ext.as_deref() { + Some("wav") | Some("wave") | Some("aiff") | Some("aif") => AudioFormat::Pcm, + _ => AudioFormat::Compressed, + }; + + Ok(AudioMetadata { + channels, + sample_rate, + duration, + n_frames, + format: audio_format, + }) +} + +/// Parsed WAV header info needed for memory-mapping. +pub struct WavHeaderInfo { + pub data_offset: usize, + pub data_size: usize, + pub sample_format: crate::audio::pool::PcmSampleFormat, + pub channels: u32, + pub sample_rate: u32, + pub total_frames: u64, +} + +/// Parse a WAV file header from a byte slice (e.g. from an mmap). +/// Returns the byte offset to PCM data and format details. +pub fn parse_wav_header(data: &[u8]) -> Result { + if data.len() < 44 { + return Err("File too small to be a valid WAV".to_string()); + } + + // RIFF header + if &data[0..4] != b"RIFF" || &data[8..12] != b"WAVE" { + return Err("Not a valid RIFF/WAVE file".to_string()); + } + + // Walk chunks to find "fmt " and "data" + let mut pos = 12; + let mut fmt_found = false; + let mut channels: u32 = 0; + let mut sample_rate: u32 = 0; + let mut bits_per_sample: u16 = 0; + let mut format_code: u16 = 0; + + let mut data_offset: usize = 0; + let mut data_size: usize = 0; + + while pos + 8 <= data.len() { + let chunk_id = &data[pos..pos + 4]; + let chunk_size = u32::from_le_bytes([ + data[pos + 4], + data[pos + 5], + data[pos + 6], + data[pos + 7], + ]) as usize; + + if chunk_id == b"fmt " { + if pos + 8 + 16 > data.len() { + return Err("fmt chunk too small".to_string()); + } + let base = pos + 8; + format_code = u16::from_le_bytes([data[base], data[base + 1]]); + channels = u16::from_le_bytes([data[base + 2], data[base + 3]]) as u32; + sample_rate = u32::from_le_bytes([ + data[base + 4], + data[base + 5], + data[base + 6], + data[base + 7], + ]); + bits_per_sample = u16::from_le_bytes([data[base + 14], data[base + 15]]); + fmt_found = true; + } else if chunk_id == b"data" { + data_offset = pos + 8; + data_size = chunk_size; + break; + } + + // Advance to next chunk (chunks are 2-byte aligned) + pos += 8 + chunk_size; + if chunk_size % 2 != 0 { + pos += 1; + } + } + + if !fmt_found { + return Err("No fmt chunk found".to_string()); + } + if data_offset == 0 { + return Err("No data chunk found".to_string()); + } + + // Determine sample format + let sample_format = match (format_code, bits_per_sample) { + (1, 16) => crate::audio::pool::PcmSampleFormat::I16, + (1, 24) => crate::audio::pool::PcmSampleFormat::I24, + (3, 32) => crate::audio::pool::PcmSampleFormat::F32, + (1, 32) => crate::audio::pool::PcmSampleFormat::F32, // 32-bit PCM treated as float + _ => { + return Err(format!( + "Unsupported WAV format: code={}, bits={}", + format_code, bits_per_sample + )); + } + }; + + let bytes_per_sample = (bits_per_sample / 8) as usize; + let bytes_per_frame = bytes_per_sample * channels as usize; + let total_frames = if bytes_per_frame > 0 { + (data_size / bytes_per_frame) as u64 + } else { + 0 + }; + + Ok(WavHeaderInfo { + data_offset, + data_size, + sample_format, + channels, + sample_rate, + total_frames, + }) +} + impl AudioFile { /// Load an audio file from disk and decode it to interleaved f32 samples pub fn load>(path: P) -> Result { diff --git a/daw-backend/src/io/mod.rs b/daw-backend/src/io/mod.rs index 3a1f730..b5b68a1 100644 --- a/daw-backend/src/io/mod.rs +++ b/daw-backend/src/io/mod.rs @@ -3,7 +3,7 @@ pub mod midi_file; pub mod midi_input; pub mod wav_writer; -pub use audio_file::{AudioFile, WaveformChunk, WaveformChunkKey, WaveformPeak}; +pub use audio_file::{AudioFile, AudioFormat, AudioMetadata, WavHeaderInfo, WaveformChunk, WaveformChunkKey, WaveformPeak, parse_wav_header, read_metadata}; pub use midi_file::load_midi_file; pub use midi_input::MidiInputManager; pub use wav_writer::WavWriter; diff --git a/lightningbeam-ui/Cargo.lock b/lightningbeam-ui/Cargo.lock index ba9b184..c154343 100644 --- a/lightningbeam-ui/Cargo.lock +++ b/lightningbeam-ui/Cargo.lock @@ -1661,6 +1661,7 @@ dependencies = [ "dasp_signal", "ffmpeg-next", "hound", + "memmap2", "midir", "midly", "pathdiff", diff --git a/lightningbeam-ui/lightningbeam-editor/src/main.rs b/lightningbeam-ui/lightningbeam-editor/src/main.rs index e521d5a..066b4b8 100644 --- a/lightningbeam-ui/lightningbeam-editor/src/main.rs +++ b/lightningbeam-ui/lightningbeam-editor/src/main.rs @@ -2318,9 +2318,14 @@ impl EditorApp { } } - /// Import an audio file via daw-backend + /// Import an audio file via daw-backend (async — non-blocking) + /// + /// Reads only metadata from the file (sub-millisecond), then sends the path + /// to the engine for async import. The engine memory-maps WAV files or sets + /// up stream decoding for compressed formats. An `AudioFileReady` event is + /// emitted when the file is playback-ready; the event handler populates the + /// GPU waveform cache. fn import_audio(&mut self, path: &std::path::Path) -> Option { - use daw_backend::io::audio_file::AudioFile; use lightningbeam_core::clip::AudioClip; let name = path.file_stem() @@ -2328,69 +2333,47 @@ impl EditorApp { .unwrap_or("Untitled Audio") .to_string(); - // Load audio file via daw-backend - match AudioFile::load(path) { - Ok(audio_file) => { - let duration = audio_file.frames as f64 / audio_file.sample_rate as f64; - let channels = audio_file.channels; - let sample_rate = audio_file.sample_rate; - - // Add to audio engine pool if available - if let Some(ref controller_arc) = self.audio_controller { - let pool_index = { - let mut controller = controller_arc.lock().unwrap(); - // Send audio data to the engine - let path_str = path.to_string_lossy().to_string(); - println!("📤 [UI] Sending AddAudioFile command to engine: {}", path_str); - controller.add_audio_file( - path_str.clone(), - audio_file.data, - channels, - sample_rate, - ); - - // For now, use a placeholder pool index (the engine will assign the real one) - // In a full implementation, we'd wait for the AudioFileAdded event - self.action_executor.document().audio_clips.len() - }; // Controller lock is dropped here - - // Create audio clip in document - let clip = AudioClip::new_sampled(&name, pool_index, duration); - let clip_id = self.action_executor.document_mut().add_audio_clip(clip); - - // Fetch raw audio samples for GPU waveform rendering - if let Some(ref controller_arc) = self.audio_controller { - let mut controller = controller_arc.lock().unwrap(); - match controller.get_pool_audio_samples(pool_index) { - Ok((samples, sr, ch)) => { - println!("✅ Cached {} raw audio samples for GPU waveform", samples.len()); - self.raw_audio_cache.insert(pool_index, (samples, sr, ch)); - self.waveform_gpu_dirty.insert(pool_index); - } - Err(e) => eprintln!("Failed to fetch raw audio: {}", e), - } - } - - println!("Imported audio '{}' ({:.1}s, {}ch, {}Hz) - ID: {}", - name, duration, channels, sample_rate, clip_id); - - Some(ImportedAssetInfo { - clip_id, - clip_type: panes::DragClipType::AudioSampled, - name, - dimensions: None, - duration, - linked_audio_clip_id: None, - }) - } else { - eprintln!("Cannot import audio: audio engine not initialized"); - None - } - } + // Read metadata without decoding (fast — sub-millisecond) + let metadata = match daw_backend::io::read_metadata(path) { + Ok(m) => m, Err(e) => { - eprintln!("Failed to load audio '{}': {}", path.display(), e); - None + eprintln!("Failed to read audio metadata '{}': {}", path.display(), e); + return None; } + }; + + let duration = metadata.duration; + let channels = metadata.channels; + let sample_rate = metadata.sample_rate; + + if let Some(ref controller_arc) = self.audio_controller { + // Predict the pool index (engine assigns sequentially) + let pool_index = self.action_executor.document().audio_clips.len(); + + // Send async import command (non-blocking) + { + let mut controller = controller_arc.lock().unwrap(); + controller.import_audio(path.to_path_buf()); + } + + // Create audio clip in document immediately (metadata is enough) + let clip = AudioClip::new_sampled(&name, pool_index, duration); + let clip_id = self.action_executor.document_mut().add_audio_clip(clip); + + println!("Imported audio '{}' ({:.1}s, {}ch, {}Hz) - ID: {} (pool: {})", + name, duration, channels, sample_rate, clip_id, pool_index); + + Some(ImportedAssetInfo { + clip_id, + clip_type: panes::DragClipType::AudioSampled, + name, + dimensions: None, + duration, + linked_audio_clip_id: None, + }) + } else { + eprintln!("Cannot import audio: audio engine not initialized"); + None } } @@ -3469,6 +3452,44 @@ impl eframe::App for EditorApp { self.recording_layer_id = None; ctx.request_repaint(); } + AudioEvent::AudioFileReady { pool_index, path, channels, sample_rate, duration, format } => { + println!("Audio file ready: pool={}, path={}, ch={}, sr={}, {:.1}s, {:?}", + pool_index, path, channels, sample_rate, duration, format); + // For PCM (mmap'd) files, raw samples are available immediately + // via the pool's data() accessor. Fetch them for GPU waveform. + if format == daw_backend::io::AudioFormat::Pcm { + if let Some(ref controller_arc) = self.audio_controller { + let mut controller = controller_arc.lock().unwrap(); + match controller.get_pool_audio_samples(pool_index) { + Ok((samples, sr, ch)) => { + self.raw_audio_cache.insert(pool_index, (samples, sr, ch)); + self.waveform_gpu_dirty.insert(pool_index); + } + Err(e) => eprintln!("Failed to fetch raw audio for pool {}: {}", pool_index, e), + } + } + } + // For compressed files, waveform data arrives progressively + // via AudioDecodeProgress events. + ctx.request_repaint(); + } + AudioEvent::AudioDecodeProgress { pool_index, decoded_frames, total_frames } => { + // Waveform decode complete — fetch samples for GPU waveform + if decoded_frames == total_frames { + if let Some(ref controller_arc) = self.audio_controller { + let mut controller = controller_arc.lock().unwrap(); + match controller.get_pool_audio_samples(pool_index) { + Ok((samples, sr, ch)) => { + println!("Waveform decode complete for pool {}: {} samples", pool_index, samples.len()); + self.raw_audio_cache.insert(pool_index, (samples, sr, ch)); + self.waveform_gpu_dirty.insert(pool_index); + } + Err(e) => eprintln!("Failed to fetch decoded audio for pool {}: {}", pool_index, e), + } + } + ctx.request_repaint(); + } + } _ => {} // Ignore other events for now } }