Improve further with websockets

This commit is contained in:
Skyler Lehmkuhl 2025-11-07 02:51:23 -05:00
parent 47e1954efe
commit 336b9952e4
8 changed files with 644 additions and 492 deletions

704
src-tauri/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,12 +24,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
tauri-plugin-fs = "2"
tauri-plugin-dialog = "2"
tauri-plugin-log = "2"
tracing = "0.1.41"
# enable env-filter feature
tracing-subscriber = {version = "0.3.19", features = ["env-filter"] }
log = "0.4"
chrono = "0.4"
env_logger = "0.11"
# DAW backend integration
daw-backend = { path = "../daw-backend" }
@ -44,6 +40,9 @@ image = { version = "0.24", default-features = false, features = ["jpeg"] }
# HTTP server for video streaming
tiny_http = "0.12"
# WebSocket for frame streaming (disable default features to remove tracing, but keep handshake)
tungstenite = { version = "0.20", default-features = false, features = ["handshake"] }
[profile.dev]
opt-level = 1 # Enable basic optimizations in debug mode for audio decoding performance

View File

@ -72,7 +72,6 @@
}
]
},
"dialog:default",
"log:default"
"dialog:default"
]
}

View File

@ -0,0 +1,88 @@
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use std::thread;
use tungstenite::{accept, Message};
pub struct FrameStreamer {
port: u16,
clients: Arc<Mutex<Vec<tungstenite::WebSocket<std::net::TcpStream>>>>,
}
impl FrameStreamer {
pub fn new() -> Result<Self, String> {
// Bind to localhost on a random available port
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create WebSocket listener: {}", e))?;
let port = listener.local_addr()
.map_err(|e| format!("Failed to get listener address: {}", e))?
.port();
// eprintln!("[Frame Streamer] WebSocket server started on port {}", port);
let clients = Arc::new(Mutex::new(Vec::new()));
let clients_clone = clients.clone();
// Spawn acceptor thread
thread::spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
// eprintln!("[Frame Streamer] New WebSocket connection from {:?}", stream.peer_addr());
match accept(stream) {
Ok(websocket) => {
let mut clients = clients_clone.lock().unwrap();
clients.push(websocket);
// eprintln!("[Frame Streamer] Client connected, total clients: {}", clients.len());
}
Err(_e) => {
// eprintln!("[Frame Streamer] Failed to accept WebSocket: {}", e);
}
}
}
Err(_e) => {
// eprintln!("[Frame Streamer] Failed to accept connection: {}", e);
}
}
}
});
Ok(Self {
port,
clients,
})
}
pub fn port(&self) -> u16 {
self.port
}
/// Send a decoded frame to all connected clients
/// Frame format: [pool_index: u32][timestamp_ms: u32][width: u32][height: u32][rgba_data...]
pub fn send_frame(&self, pool_index: usize, timestamp: f64, width: u32, height: u32, rgba_data: &[u8]) {
let mut clients = self.clients.lock().unwrap();
// Debug: Log input dimensions and first few RGBA bytes
eprintln!("[Frame Streamer SEND] pool={}, {}x{} pixels, RGBA input len={}, first 20 RGBA bytes: {:?}",
pool_index, width, height, rgba_data.len(), &rgba_data[..20.min(rgba_data.len())]);
// Build frame message (rgba_data is already in RGBA format from decoder)
let mut frame_msg = Vec::with_capacity(16 + rgba_data.len());
frame_msg.extend_from_slice(&(pool_index as u32).to_le_bytes());
frame_msg.extend_from_slice(&((timestamp * 1000.0) as u32).to_le_bytes());
frame_msg.extend_from_slice(&width.to_le_bytes());
frame_msg.extend_from_slice(&height.to_le_bytes());
frame_msg.extend_from_slice(rgba_data);
// Send to all clients, remove disconnected ones
clients.retain_mut(|client| {
match client.write_message(Message::Binary(frame_msg.clone())) {
Ok(_) => true,
Err(_e) => {
// eprintln!("[Frame Streamer] Client disconnected: {}", e);
false
}
}
});
}
}

View File

@ -1,14 +1,12 @@
use std::{path::PathBuf, sync::{Arc, Mutex}};
use tauri_plugin_log::{Target, TargetKind};
use log::{trace, info, debug, warn, error};
use tracing_subscriber::EnvFilter;
use chrono::Local;
use tauri::{AppHandle, Manager, Url, WebviewUrl, WebviewWindowBuilder};
mod audio;
mod video;
mod video_server;
mod frame_streamer;
#[derive(Default)]
@ -43,6 +41,14 @@ fn error(msg: String) {
error!("{}",msg);
}
#[tauri::command]
fn get_frame_streamer_port(
frame_streamer: tauri::State<'_, Arc<Mutex<frame_streamer::FrameStreamer>>>,
) -> u16 {
let streamer = frame_streamer.lock().unwrap();
streamer.port()
}
use tauri::PhysicalSize;
#[tauri::command]
@ -128,17 +134,27 @@ fn handle_file_associations(app: AppHandle, files: Vec<PathBuf>) {
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
let pkg_name = env!("CARGO_PKG_NAME").to_string();
// Initialize env_logger with Error level only
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Error)
.init();
// Initialize video HTTP server
let video_server = video_server::VideoServer::new()
.expect("Failed to start video server");
eprintln!("[App] Video server started on port {}", video_server.port());
// Initialize WebSocket frame streamer
let frame_streamer = frame_streamer::FrameStreamer::new()
.expect("Failed to start frame streamer");
eprintln!("[App] Frame streamer started on port {}", frame_streamer.port());
tauri::Builder::default()
.manage(Mutex::new(AppState::default()))
.manage(Arc::new(Mutex::new(audio::AudioState::default())))
.manage(Arc::new(Mutex::new(video::VideoState::default())))
.manage(Arc::new(Mutex::new(video_server)))
.manage(Arc::new(Mutex::new(frame_streamer)))
.setup(|app| {
#[cfg(any(windows, target_os = "linux"))] // Windows/Linux needs different handling from macOS
{
@ -174,34 +190,38 @@ pub fn run() {
}
Ok(())
})
.plugin(
tauri_plugin_log::Builder::new()
.timezone_strategy(tauri_plugin_log::TimezoneStrategy::UseLocal)
.format(|out, message, record| {
let date = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
out.finish(format_args!(
"{}[{}] {}",
date,
record.level(),
message
))
})
.targets([
Target::new(TargetKind::Stdout),
// LogDir locations:
// Linux: /home/user/.local/share/org.lightningbeam.core/logs
// macOS: /Users/user/Library/Logs/org.lightningbeam.core/logs
// Windows: C:\Users\user\AppData\Local\org.lightningbeam.core\logs
Target::new(TargetKind::LogDir { file_name: Some("logs".to_string()) }),
Target::new(TargetKind::Webview),
])
.build()
)
// .plugin(
// tauri_plugin_log::Builder::new()
// .filter(|metadata| {
// // ONLY allow Error-level logs, block everything else
// metadata.level() == log::Level::Error
// })
// .timezone_strategy(tauri_plugin_log::TimezoneStrategy::UseLocal)
// .format(|out, message, record| {
// let date = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
// out.finish(format_args!(
// "{}[{}] {}",
// date,
// record.level(),
// message
// ))
// })
// .targets([
// Target::new(TargetKind::Stdout),
// // LogDir locations:
// // Linux: /home/user/.local/share/org.lightningbeam.core/logs
// // macOS: /Users/user/Library/Logs/org.lightningbeam.core/logs
// // Windows: C:\Users\user\AppData\Local\org.lightningbeam.core\logs
// Target::new(TargetKind::LogDir { file_name: Some("logs".to_string()) }),
// Target::new(TargetKind::Webview),
// ])
// .build()
// )
.plugin(tauri_plugin_dialog::init())
.plugin(tauri_plugin_fs::init())
.plugin(tauri_plugin_shell::init())
.invoke_handler(tauri::generate_handler![
greet, trace, debug, info, warn, error, create_window,
greet, trace, debug, info, warn, error, create_window, get_frame_streamer_port,
audio::audio_init,
audio::audio_reset,
audio::audio_play,
@ -263,6 +283,7 @@ pub fn run() {
video::video_load_file,
video::video_get_frame,
video::video_get_frames_batch,
video::video_stream_frame,
video::video_set_cache_size,
video::video_get_pool_info,
video::video_ipc_benchmark,
@ -295,5 +316,4 @@ pub fn run() {
}
},
);
tracing_subscriber::fmt().with_env_filter(EnvFilter::new(format!("{}=trace", pkg_name))).init();
}

View File

@ -876,3 +876,46 @@ pub async fn video_get_frames_batch(
Ok(())
}
/// Stream a decoded video frame over WebSocket (zero-copy performance testing)
#[tauri::command]
pub async fn video_stream_frame(
video_state: tauri::State<'_, Arc<Mutex<VideoState>>>,
frame_streamer: tauri::State<'_, Arc<Mutex<crate::frame_streamer::FrameStreamer>>>,
pool_index: usize,
timestamp: f64,
) -> Result<(), String> {
use std::time::Instant;
let t_start = Instant::now();
// Get decoder
let state = video_state.lock().unwrap();
let decoder = state.pool.get(pool_index)
.ok_or("Invalid pool index")?
.clone();
drop(state);
// Decode frame
let mut decoder = decoder.lock().unwrap();
let width = decoder.output_width;
let height = decoder.output_height;
let t_decode_start = Instant::now();
let rgba_data = decoder.get_frame(timestamp)?; // Note: get_frame returns RGBA, not RGB
let t_decode = t_decode_start.elapsed().as_micros();
drop(decoder);
// Stream over WebSocket
let t_stream_start = Instant::now();
let streamer = frame_streamer.lock().unwrap();
streamer.send_frame(pool_index, timestamp, width, height, &rgba_data);
let t_stream = t_stream_start.elapsed().as_micros();
drop(streamer);
// Commented out per-frame logging
// let t_total = t_start.elapsed().as_micros();
// eprintln!("[Video Stream] Frame {}x{} @ {:.2}s | Decode: {}μs | Stream: {}μs | Total: {}μs",
// width, height, timestamp, t_decode, t_stream, t_total);
Ok(())
}

106
src/frame-receiver.js Normal file
View File

@ -0,0 +1,106 @@
// WebSocket frame receiver for zero-copy video playback
// Uses ArrayBuffer views to avoid copying data
export class FrameReceiver {
constructor() {
this.ws = null;
this.port = null;
this.connected = false;
this.frameCallbacks = new Map(); // pool_index -> callback(imageData, timestamp)
}
async connect() {
// Get WebSocket port from Tauri
const { invoke } = window.__TAURI__.core;
this.port = await invoke('get_frame_streamer_port');
const wsUrl = `ws://127.0.0.1:${this.port}`;
console.log(`[FrameReceiver] Connecting to ${wsUrl}`);
return new Promise((resolve, reject) => {
this.ws = new WebSocket(wsUrl);
this.ws.binaryType = 'arraybuffer'; // Important: receive as ArrayBuffer for zero-copy
this.ws.onopen = () => {
console.log('[FrameReceiver] Connected');
this.connected = true;
resolve();
};
this.ws.onerror = (error) => {
console.error('[FrameReceiver] WebSocket error:', error);
reject(error);
};
this.ws.onclose = () => {
console.log('[FrameReceiver] Disconnected');
this.connected = false;
};
this.ws.onmessage = (event) => {
this.handleFrame(event.data);
};
});
}
handleFrame(arrayBuffer) {
// Frame format: [pool_index: u32][timestamp_ms: u32][width: u32][height: u32][rgba_data...]
// Create DataView for reading header (zero-copy view into buffer)
const view = new DataView(arrayBuffer);
const poolIndex = view.getUint32(0, true); // little-endian
const timestampMs = view.getUint32(4, true);
const width = view.getUint32(8, true);
const height = view.getUint32(12, true);
// Get callback for this pool
const callback = this.frameCallbacks.get(poolIndex);
if (!callback) {
// No subscriber for this pool
return;
}
// Create zero-copy view of RGBA data (starts at byte 16)
// IMPORTANT: Uint8ClampedArray is required for ImageData
// Specify exact length to avoid stride issues
const dataLength = width * height * 4;
const rgbaData = new Uint8ClampedArray(arrayBuffer, 16, dataLength);
// Debug: Log received data
console.log(`[FrameReceiver RECV] pool=${poolIndex}, ${width}x${height}, total buffer len=${arrayBuffer.byteLength}, data len=${dataLength}, first 20 RGBA bytes:`, Array.from(rgbaData.slice(0, 20)));
// Create ImageData directly from the view (zero-copy!)
const imageData = new ImageData(rgbaData, width, height);
// Debug: Log ImageData properties
console.log(`[FrameReceiver RECV] ImageData: ${imageData.width}x${imageData.height}, data len=${imageData.data.length}, first 20 bytes:`, Array.from(imageData.data.slice(0, 20)));
// Call subscriber with frame data
const timestamp = timestampMs / 1000.0;
callback(imageData, timestamp);
}
// Subscribe to frames for a specific video pool
subscribe(poolIndex, callback) {
console.log(`[FrameReceiver] Subscribing to pool ${poolIndex}`);
this.frameCallbacks.set(poolIndex, callback);
}
// Unsubscribe from a video pool
unsubscribe(poolIndex) {
console.log(`[FrameReceiver] Unsubscribing from pool ${poolIndex}`);
this.frameCallbacks.delete(poolIndex);
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.connected = false;
this.frameCallbacks.clear();
}
}
// Global singleton instance
export const frameReceiver = new FrameReceiver();

View File

@ -13,6 +13,7 @@ import {
getShapeAtPoint,
generateWaveform
} from '../utils.js';
import { frameReceiver } from '../frame-receiver.js';
// External libraries (globals)
const Tone = window.Tone;
@ -1284,6 +1285,10 @@ class VideoLayer extends Widget {
this.useJpegCompression = false; // JPEG compression adds more overhead than it saves (default: false)
this.prefetchCount = 3; // Number of frames to prefetch ahead of playhead
// WebSocket streaming (experimental - zero-copy RGBA frames from Rust)
this.useWebSocketStreaming = true; // Use WebSocket streaming (enabled for testing)
this.wsConnected = false; // Track WebSocket connection status
// Timeline display
this.collapsed = false;
this.curvesMode = 'segment';
@ -1319,8 +1324,38 @@ class VideoLayer extends Widget {
console.log(`Video clip added: ${name}, ${width}x${height}, duration: ${duration}s, browser-compatible: ${clip.isBrowserCompatible}, http_url: ${clip.httpUrl}`);
// If HTTP URL is available, create video element immediately
if (clip.httpUrl) {
// If using WebSocket streaming, connect and subscribe
if (this.useWebSocketStreaming) {
// Connect to WebSocket if not already connected
if (!this.wsConnected) {
try {
await frameReceiver.connect();
this.wsConnected = true;
console.log(`[Video] WebSocket connected for streaming`);
} catch (error) {
console.error('[Video] Failed to connect WebSocket, falling back to browser video:', error);
this.useWebSocketStreaming = false;
}
}
// Subscribe to frames for this pool
if (this.wsConnected) {
frameReceiver.subscribe(poolIndex, (imageData, timestamp) => {
// Store received frame
clip.wsCurrentFrame = imageData;
clip.wsLastTimestamp = timestamp;
// console.log(`[Video WS] Received frame ${width}x${height} @ ${timestamp.toFixed(3)}s`);
// Trigger UI redraw
if (updateUI) {
updateUI();
}
});
console.log(`[Video] Subscribed to WebSocket frames for pool ${poolIndex}`);
}
}
// Otherwise use browser video if available
else if (clip.httpUrl) {
await this._createVideoElement(clip);
clip.useBrowserVideo = true;
}
@ -1440,6 +1475,7 @@ class VideoLayer extends Widget {
if (currentTime < clip.startTime ||
currentTime >= clip.startTime + clip.duration) {
clip.currentFrame = null;
clip.wsCurrentFrame = null;
// Pause video element if we left its time range
if (clip.videoElement && clip.isPlaying) {
@ -1450,6 +1486,24 @@ class VideoLayer extends Widget {
continue;
}
// If using WebSocket streaming
if (this.useWebSocketStreaming && this.wsConnected) {
const videoTime = clip.offset + (currentTime - clip.startTime);
// Request frame via WebSocket streaming (non-blocking)
// The frame will arrive via the subscription callback and trigger a redraw
try {
await invoke('video_stream_frame', {
poolIndex: clip.poolIndex,
timestamp: videoTime
});
} catch (error) {
console.error('[Video WS] Failed to stream frame:', error);
}
continue; // Skip other frame fetching methods
}
// If using browser video element
if (clip.useBrowserVideo && clip.videoElement) {
const videoTime = clip.offset + (currentTime - clip.startTime);
@ -1687,13 +1741,50 @@ class VideoLayer extends Widget {
}
// Debug: log what path we're taking
if (!clip._drawPathLogged) {
console.log(`[Video Draw] useBrowserVideo=${clip.useBrowserVideo}, videoElement=${!!clip.videoElement}, currentFrame=${!!clip.currentFrame}`);
clip._drawPathLogged = true;
}
// if (!clip._drawPathLogged) {
// console.log(`[Video Draw] useWebSocketStreaming=${this.useWebSocketStreaming}, wsCurrentFrame=${!!clip.wsCurrentFrame}, useBrowserVideo=${clip.useBrowserVideo}, videoElement=${!!clip.videoElement}, currentFrame=${!!clip.currentFrame}`);
// clip._drawPathLogged = true;
// }
// Prefer WebSocket streaming if available
if (this.useWebSocketStreaming && clip.wsCurrentFrame) {
try {
// Create a temporary canvas to hold the ImageData
if (!clip._wsCanvas) {
clip._wsCanvas = document.createElement('canvas');
}
const tempCanvas = clip._wsCanvas;
// Set temp canvas size to match ImageData dimensions
if (tempCanvas.width !== clip.wsCurrentFrame.width || tempCanvas.height !== clip.wsCurrentFrame.height) {
tempCanvas.width = clip.wsCurrentFrame.width;
tempCanvas.height = clip.wsCurrentFrame.height;
}
// Put ImageData on temp canvas (zero-copy)
const tempCtx = tempCanvas.getContext('2d');
tempCtx.putImageData(clip.wsCurrentFrame, 0, 0);
// Scale to fit canvas while maintaining aspect ratio
const canvasWidth = config.fileWidth;
const canvasHeight = config.fileHeight;
const scale = Math.min(
canvasWidth / clip.width,
canvasHeight / clip.height
);
const scaledWidth = clip.width * scale;
const scaledHeight = clip.height * scale;
const x = (canvasWidth - scaledWidth) / 2;
const y = (canvasHeight - scaledHeight) / 2;
// Draw scaled to main canvas (GPU-accelerated)
ctx.drawImage(tempCanvas, x, y, scaledWidth, scaledHeight);
} catch (error) {
console.error('[Video WS Draw] Failed to draw WebSocket frame:', error);
}
}
// Prefer browser video element if available
if (clip.useBrowserVideo && clip.videoElement) {
else if (clip.useBrowserVideo && clip.videoElement) {
// Debug: log readyState issues
if (clip.videoElement.readyState < 2) {
if (!clip._readyStateWarned) {