diff --git a/Cargo.lock b/Cargo.lock index 803184e54..912150d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -481,6 +481,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9a7b350e3bb1767102698302bc37256cbd48422809984b98d292c40e2579aa9" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b092fe214090261288111db7a2b2c2118e5a7f30dc2569f1732c4069a6840549" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.6.20" @@ -1024,6 +1046,15 @@ dependencies = [ "error-code", ] +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + [[package]] name = "cocoa" version = "0.20.2" @@ -1724,6 +1755,12 @@ version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5" +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "ecdsa" version = "0.16.9" @@ -2268,6 +2305,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.31" @@ -2895,11 +2938,11 @@ dependencies = [ "http 1.4.0", "hyper 1.8.1", "hyper-util", - "rustls 0.23.36", + "rustls", "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower-service", "webpki-roots 1.0.5", ] @@ -3576,7 +3619,7 @@ dependencies = [ "sha2", "thiserror 1.0.69", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-tungstenite", "url", ] @@ -5220,7 +5263,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls 0.23.36", + "rustls", "socket2 0.6.2", "thiserror 2.0.18", "tokio", @@ -5240,7 +5283,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash 2.1.1", - "rustls 0.23.36", + "rustls", "rustls-pki-types", "slab", "thiserror 2.0.18", @@ -5455,7 +5498,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.36", + "rustls", "rustls-native-certs 0.8.3", "rustls-pki-types", "serde", @@ -5464,7 +5507,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-native-tls", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower 0.5.3", "tower-http", "tower-service", @@ -5611,28 +5654,18 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.23.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ + "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.9", + "rustls-webpki", "subtle", "zeroize", ] @@ -5680,22 +5713,13 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.103.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -5791,16 +5815,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sctk-adwaita" version = "0.10.1" @@ -6556,23 +6570,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls 0.23.36", + "rustls", "tokio", ] @@ -6589,20 +6593,21 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.1" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" dependencies = [ "futures-util", "log", "native-tls", - "rustls 0.21.12", - "rustls-native-certs 0.6.3", + "rustls", + "rustls-native-certs 0.8.3", + "rustls-pki-types", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.1", - "tungstenite 0.20.1", - "webpki-roots 0.25.4", + "tokio-rustls", + "tungstenite 0.28.0", + "webpki-roots 0.26.11", ] [[package]] @@ -6848,19 +6853,18 @@ checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31" [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" dependencies = [ "byteorder", "bytes", "data-encoding", - "http 0.2.12", + "http 1.4.0", "httparse", "log", "native-tls", "rand 0.8.5", - "rustls 0.21.12", "sha1", "thiserror 1.0.69", "url", @@ -6869,21 +6873,21 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.21.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" dependencies = [ - "byteorder", "bytes", "data-encoding", "http 1.4.0", "httparse", "log", "native-tls", - "rand 0.8.5", + "rand 0.9.2", + "rustls", + "rustls-pki-types", "sha1", - "thiserror 1.0.69", - "url", + "thiserror 2.0.18", "utf-8", ] @@ -7432,9 +7436,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.5", +] [[package]] name = "webpki-roots" diff --git a/examples/agent_dispatch/Cargo.toml b/examples/agent_dispatch/Cargo.toml index bbcc32488..6f9bcaca9 100644 --- a/examples/agent_dispatch/Cargo.toml +++ b/examples/agent_dispatch/Cargo.toml @@ -6,7 +6,7 @@ publish = false [dependencies] tokio = { workspace = true, features = ["full", "parking_lot"] } -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } env_logger = { workspace = true } log = { workspace = true } livekit-protocol = { workspace = true } diff --git a/examples/api/Cargo.toml b/examples/api/Cargo.toml index bdd46091a..7aead7623 100644 --- a/examples/api/Cargo.toml +++ b/examples/api/Cargo.toml @@ -6,4 +6,4 @@ publish = false [dependencies] tokio = { workspace = true, features = ["full", "parking_lot"] } -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } diff --git a/examples/basic_room/Cargo.toml b/examples/basic_room/Cargo.toml index 9b8651984..7298801cc 100644 --- a/examples/basic_room/Cargo.toml +++ b/examples/basic_room/Cargo.toml @@ -8,5 +8,5 @@ publish = false tokio = { workspace = true, features = ["full"] } env_logger = { workspace = true } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } log = { workspace = true } diff --git a/examples/encrypted_text_stream/Cargo.toml b/examples/encrypted_text_stream/Cargo.toml index 272a26f56..b3205544e 100644 --- a/examples/encrypted_text_stream/Cargo.toml +++ b/examples/encrypted_text_stream/Cargo.toml @@ -8,6 +8,6 @@ publish = false tokio = { workspace = true, features = ["full"] } futures-util = { workspace = true, default-features = false, features = ["sink"] } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } log = { workspace = true } env_logger = { workspace = true } diff --git a/examples/local_audio/Cargo.toml b/examples/local_audio/Cargo.toml index 200f528b3..8eda84b9c 100644 --- a/examples/local_audio/Cargo.toml +++ b/examples/local_audio/Cargo.toml @@ -8,7 +8,7 @@ publish = false tokio = { workspace = true, features = ["full"] } env_logger = { workspace = true } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } libwebrtc = { workspace = true } log = { workspace = true } cpal = "0.15" diff --git a/examples/local_video/Cargo.toml b/examples/local_video/Cargo.toml index ef97cd376..64f892500 100644 --- a/examples/local_video/Cargo.toml +++ b/examples/local_video/Cargo.toml @@ -27,7 +27,7 @@ tokio = { workspace = true, features = ["full", "parking_lot"] } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } webrtc-sys = { workspace = true } libwebrtc = { workspace = true } -livekit-api = { workspace = true } +livekit-api = { workspace = true, features = ["webhooks"] } yuv-sys = { workspace = true, features = ["jpeg"] } futures = { workspace = true } clap = { workspace = true, features = ["derive"] } diff --git a/examples/screensharing/Cargo.toml b/examples/screensharing/Cargo.toml index ad97cc5df..56ad5593e 100644 --- a/examples/screensharing/Cargo.toml +++ b/examples/screensharing/Cargo.toml @@ -8,6 +8,6 @@ publish = false tokio = { workspace = true, features = ["full"] } env_logger = { workspace = true } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -livekit-api = { workspace = true } +livekit-api = { workspace = true, features = ["webhooks"] } log = { workspace = true } clap = { workspace = true, features = ["derive"] } diff --git a/examples/webhooks/Cargo.toml b/examples/webhooks/Cargo.toml index e05e00c5b..031595507 100644 --- a/examples/webhooks/Cargo.toml +++ b/examples/webhooks/Cargo.toml @@ -5,6 +5,6 @@ edition.workspace = true publish = false [dependencies] -livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } +livekit-api = { workspace = true, features = ["webhooks", "rustls-tls-native-roots"] } hyper = { version = "0.14", features = ["full"] } tokio = { workspace = true, features = ["full"] } diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 0329e979e..d90f26d80 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -8,7 +8,7 @@ repository.workspace = true [features] # By default ws TLS is not enabled -default = ["services-tokio", "access-token", "webhooks"] +default = ["services-tokio"] signal-client-tokio = [ "dep:tokio-tungstenite", @@ -89,10 +89,10 @@ jsonwebtoken = { version = "10", default-features = false, features = ["rust_cry # signal_client livekit-runtime = { workspace = true, optional = true} -tokio-tungstenite = { version = "0.20", optional = true } +tokio-tungstenite = { version = "0.28", optional = true } async-tungstenite = { version = "0.25.0", features = [ "async-std-runtime", "async-native-tls"], optional = true } tokio = { workspace = true, default-features = false, features = ["sync", "macros", "signal", "io-util", "net"], optional = true } -tokio-rustls = { version = "0.24", optional = true } +tokio-rustls = { version = "0.26", optional = true } rustls-native-certs = { version = "0.6", optional = true } futures-util = { workspace = true, default-features = false, features = [ "sink" ], optional = true } diff --git a/livekit-api/src/lib.rs b/livekit-api/src/lib.rs index 8674878a5..af4ece9ef 100644 --- a/livekit-api/src/lib.rs +++ b/livekit-api/src/lib.rs @@ -15,7 +15,7 @@ #[cfg(feature = "access-token")] pub mod access_token; -#[cfg(any(feature = "services-tokio", feature = "services-async"))] +#[cfg(all(any(feature = "services-tokio", feature = "services-async"), feature = "access-token"))] pub mod services; #[cfg(any( diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 32867d17a..f19a69c29 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -91,6 +91,14 @@ impl Default for SignalSdkOptions { } } +#[cfg(all(feature = "signal-client-tokio", feature = "rustls-tls-native-roots"))] +#[derive(Debug, Default, Clone)] +pub struct TlsConfig(pub Option); + +#[cfg(not(all(feature = "signal-client-tokio", feature = "rustls-tls-native-roots")))] +#[derive(Debug, Default, Clone)] +pub struct TlsConfig; + #[derive(Debug, Clone)] #[non_exhaustive] pub struct SignalOptions { @@ -101,6 +109,8 @@ pub struct SignalOptions { pub single_peer_connection: bool, /// Timeout for each individual signal connection attempt pub connect_timeout: Duration, + /// Custom TLS config + pub tls_config: TlsConfig, } impl Default for SignalOptions { @@ -111,6 +121,7 @@ impl Default for SignalOptions { sdk_options: SignalSdkOptions::default(), single_peer_connection: false, connect_timeout: SIGNAL_CONNECT_TIMEOUT, + tls_config: TlsConfig::default(), } } } @@ -273,60 +284,67 @@ impl SignalInner { // For initial connection: reconnect=false, reconnect_reason=None, participant_sid="" let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?; // Try to connect to the SignalClient - let (stream, mut events, single_pc_mode_active) = - match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await { - Ok((new_stream, stream_events)) => { - log::debug!( - "signal connection successful: path={}, single_pc_mode={}", - if use_v1_path { "v1" } else { "v0" }, - use_v1_path - ); - (new_stream, stream_events, use_v1_path) + let (stream, mut events, single_pc_mode_active) = match SignalStream::connect( + lk_url.clone(), + token, + options.connect_timeout, + options.tls_config.clone(), + ) + .await + { + Ok((new_stream, stream_events)) => { + log::debug!( + "signal connection successful: path={}, single_pc_mode={}", + if use_v1_path { "v1" } else { "v0" }, + use_v1_path + ); + (new_stream, stream_events, use_v1_path) + } + Err(err) => { + log::warn!( + "signal connection failed on {} path: {:?}", + if use_v1_path { "v1" } else { "v0" }, + err + ); + + if let SignalError::TokenFormat = err { + return Err(err); } - Err(err) => { - log::warn!( - "signal connection failed on {} path: {:?}", - if use_v1_path { "v1" } else { "v0" }, - err - ); - - if let SignalError::TokenFormat = err { - return Err(err); - } - // Only fallback to v0 if the v1 endpoint returned 404 (not found). - // Other errors (401, 403, 500, etc.) indicate real issues that shouldn't - // be masked by falling back to a different signaling mode. - let is_not_found = - matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404); - - if use_v1_path && is_not_found { - let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?; - log::warn!("v1 path not found (404), falling back to v0 path"); - match SignalStream::connect( - lk_url_v0.clone(), - token, - options.connect_timeout, - ) - .await - { - Ok((new_stream, stream_events)) => (new_stream, stream_events, false), - Err(err) => { - log::error!("v0 fallback also failed: {:?}", err); - if let SignalError::TokenFormat = err { - return Err(err); - } - Self::validate(lk_url_v0).await?; + // Only fallback to v0 if the v1 endpoint returned 404 (not found). + // Other errors (401, 403, 500, etc.) indicate real issues that shouldn't + // be masked by falling back to a different signaling mode. + let is_not_found = + matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404); + + if use_v1_path && is_not_found { + let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?; + log::warn!("v1 path not found (404), falling back to v0 path"); + match SignalStream::connect( + lk_url_v0.clone(), + token, + options.connect_timeout, + options.tls_config.clone(), + ) + .await + { + Ok((new_stream, stream_events)) => (new_stream, stream_events, false), + Err(err) => { + log::error!("v0 fallback also failed: {:?}", err); + if let SignalError::TokenFormat = err { return Err(err); } + Self::validate(lk_url_v0).await?; + return Err(err); } - } else { - // Connection failed, try to retrieve more information - Self::validate(lk_url).await?; - return Err(err); } + } else { + // Connection failed, try to retrieve more information + Self::validate(lk_url).await?; + return Err(err); } - }; + } + }; let join_response = get_join_response(&mut events).await?; @@ -401,8 +419,13 @@ impl SignalInner { get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid) .unwrap(); - let (new_stream, mut events) = - SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?; + let (new_stream, mut events) = SignalStream::connect( + lk_url, + &token, + self.options.connect_timeout, + self.options.tls_config.clone(), + ) + .await?; let reconnect_response = get_reconnect_response(&mut events).await?; *stream = Some(new_stream); @@ -796,7 +819,13 @@ mod tests { }); let url = url::Url::parse(&format!("ws://127.0.0.1:{}", addr.port())).unwrap(); - let result = SignalStream::connect(url, "fake-token", Duration::from_millis(500)).await; + let result = SignalStream::connect( + url, + "fake-token", + Duration::from_millis(500), + TlsConfig::default(), + ) + .await; assert!(result.is_err()); let err = result.unwrap_err(); diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 89c429fdc..d31c5e34a 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -24,7 +24,7 @@ use std::{env, io, time::Duration}; use tokio::sync::{mpsc, oneshot}; #[cfg(feature = "signal-client-tokio")] -use base64; +use base64::{engine::general_purpose, Engine as _}; #[cfg(feature = "signal-client-tokio")] use tokio::{ @@ -53,6 +53,8 @@ use async_tungstenite::{ WebSocketStream, }; +use crate::signal_client::TlsConfig; + use super::{SignalError, SignalResult}; type WebSocket = WebSocketStream>; @@ -89,8 +91,9 @@ impl SignalStream { url: url::Url, token: &str, connect_timeout: Duration, + tls_config: TlsConfig, ) -> SignalResult<(Self, mpsc::UnboundedReceiver>)> { - let connect_fut = Self::connect_inner(url, token); + let connect_fut = Self::connect_inner(url, token, tls_config); livekit_runtime::timeout(connect_timeout, connect_fut) .await .map_err(|_| SignalError::Timeout("signal connection timed out".into()))? @@ -99,9 +102,10 @@ impl SignalStream { async fn connect_inner( url: url::Url, token: &str, + tls_config: TlsConfig, ) -> SignalResult<(Self, mpsc::UnboundedReceiver>)> { log::info!("connecting to {}", url); - let mut request = url.clone().into_client_request()?; + let mut request = url.as_str().into_client_request()?; let auth_header = HeaderValue::from_str(&format!("Bearer {token}")) .map_err(|_| SignalError::TokenFormat)?; request.headers_mut().insert(AUTHORIZATION, auth_header); @@ -116,201 +120,223 @@ impl SignalStream { }; // Connect directly or through proxy - let ws_stream = if let Ok(proxy_url) = proxy_env { - if !proxy_url.is_empty() { - log::info!("Using proxy: {}", proxy_url); - let proxy_url = url::Url::parse(&proxy_url).map_err(|e| { - WsError::Io(io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid proxy URL: {}", e), - )) - })?; - - let host = url.host_str().ok_or_else(|| { - WsError::Io(io::Error::new( - io::ErrorKind::InvalidInput, - "Target URL has no host", - )) - })?; - - let port = url.port_or_known_default().ok_or_else(|| { - WsError::Io(io::Error::new( - io::ErrorKind::InvalidInput, - "Target URL has no port and no default for scheme", - )) - })?; + let ws_stream: WebSocketStream<_> = if let Some(proxy_url) = + proxy_env.ok().filter(|proxy_url| !proxy_url.is_empty()) + { + log::info!("Using proxy: {}", proxy_url); + let proxy_url = url::Url::parse(&proxy_url).map_err(|e| { + WsError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid proxy URL: {}", e), + )) + })?; + + let host = url.host_str().ok_or_else(|| { + WsError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "Target URL has no host", + )) + })?; + + let port = url.port_or_known_default().ok_or_else(|| { + WsError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "Target URL has no port and no default for scheme", + )) + })?; + + let proxy_host = proxy_url.host_str().ok_or_else(|| { + WsError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "Proxy URL has no host", + )) + })?; + + let proxy_port = proxy_url.port_or_known_default().unwrap_or(80); + let proxy_addr = format!("{}:{}", proxy_host, proxy_port); + + let mut proxy_stream = + TokioTcpStream::connect(proxy_addr).await.map_err(WsError::Io)?; + + let mut proxy_auth_header = None; + if let Some(password) = proxy_url.password() { + let auth = format!("{}:{}", proxy_url.username(), password); + let auth = format!("Basic {}", general_purpose::STANDARD.encode(auth)); + proxy_auth_header = Some(auth); + } - let proxy_host = proxy_url.host_str().ok_or_else(|| { - WsError::Io(io::Error::new( - io::ErrorKind::InvalidInput, - "Proxy URL has no host", - )) - })?; + // Send CONNECT request + let target = format!("{}:{}", host, port); + let mut connect_req = + format!("CONNECT {} HTTP/1.1\r\nHost: {}\r\n", target, target); - let proxy_port = proxy_url.port_or_known_default().unwrap_or(80); - let proxy_addr = format!("{}:{}", proxy_host, proxy_port); + // Add proxy authorization if needed + if let Some(auth) = proxy_auth_header { + connect_req.push_str(&format!("Proxy-Authorization: {}\r\n", auth)); + } - let mut proxy_stream = - TokioTcpStream::connect(proxy_addr).await.map_err(WsError::Io)?; + // Finalize request + connect_req.push_str("\r\n"); - let mut proxy_auth_header = None; - if let Some(password) = proxy_url.password() { - let auth = format!("{}:{}", proxy_url.username(), password); - let auth = format!("Basic {}", base64::encode(auth)); - proxy_auth_header = Some(auth); - } + log::debug!("Sending CONNECT request to proxy"); + proxy_stream.write_all(connect_req.as_bytes()).await.map_err(WsError::Io)?; - // Send CONNECT request - let target = format!("{}:{}", host, port); - let mut connect_req = - format!("CONNECT {} HTTP/1.1\r\nHost: {}\r\n", target, target); + // Read and parse response + let mut response = Vec::new(); + let mut buf = [0u8; 4096]; + let mut headers_complete = false; - // Add proxy authorization if needed - if let Some(auth) = proxy_auth_header { - connect_req.push_str(&format!("Proxy-Authorization: {}\r\n", auth)); + while !headers_complete { + let n = proxy_stream.read(&mut buf).await.map_err(WsError::Io)?; + if n == 0 { + return Err(WsError::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Proxy connection closed while reading response", + )) + .into()); } - // Finalize request - connect_req.push_str("\r\n"); - - log::debug!("Sending CONNECT request to proxy"); - proxy_stream.write_all(connect_req.as_bytes()).await.map_err(WsError::Io)?; - - // Read and parse response - let mut response = Vec::new(); - let mut buf = [0u8; 4096]; - let mut headers_complete = false; - - while !headers_complete { - let n = proxy_stream.read(&mut buf).await.map_err(WsError::Io)?; - if n == 0 { - return Err(WsError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "Proxy connection closed while reading response", - )) - .into()); - } - - response.extend_from_slice(&buf[..n]); + response.extend_from_slice(&buf[..n]); - // Check if we've received the end of headers (double CRLF) - if response.windows(4).any(|w| w == b"\r\n\r\n") { - headers_complete = true; - } + // Check if we've received the end of headers (double CRLF) + if response.windows(4).any(|w| w == b"\r\n\r\n") { + headers_complete = true; } + } - // Parse status line - let response_str = String::from_utf8_lossy(&response); - let status_line = response_str.lines().next().ok_or_else(|| { - WsError::Io(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid proxy response", - )) - })?; - - // Check status code - if !status_line.contains("200") { - return Err(WsError::Io(io::Error::new( - io::ErrorKind::ConnectionRefused, - format!("Proxy connection failed: {}", status_line), - )) - .into()); - } + // Parse status line + let response_str = String::from_utf8_lossy(&response); + let status_line = response_str.lines().next().ok_or_else(|| { + WsError::Io(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid proxy response", + )) + })?; + + // Check status code + if !status_line.contains("200") { + return Err(WsError::Io(io::Error::new( + io::ErrorKind::ConnectionRefused, + format!("Proxy connection failed: {}", status_line), + )) + .into()); + } - log::debug!("Proxy connection established to {}", target); - - // Create MaybeTlsStream based on original URL scheme - let stream = if url.scheme() == "wss" { - // Only enable proxy TLS support when rustls-tls-native-roots is enabled - #[cfg(feature = "rustls-tls-native-roots")] - { - // For WSS, we need to establish TLS over the proxy connection - use std::sync::Arc; - use tokio_rustls::{rustls, TlsConnector}; - - // Load native root certificates - let mut root_store = rustls::RootCertStore::empty(); - match rustls_native_certs::load_native_certs() { - Ok(certs) => { - let roots: Vec = certs - .into_iter() - .map(|cert| rustls::Certificate(cert.0)) - .collect(); - - for root in roots { - root_store.add(&root).map_err(|e| { - WsError::Io(io::Error::new( - io::ErrorKind::Other, - format!( - "Failed to parse root certificate: {:?}", - e - ), - )) - })?; + log::debug!("Proxy connection established to {}", target); + + // Create MaybeTlsStream based on original URL scheme + let stream = if url.scheme() == "wss" { + // Only enable proxy TLS support when rustls-tls-native-roots is enabled + #[cfg(feature = "rustls-tls-native-roots")] + { + // For WSS, we need to establish TLS over the proxy connection + use std::sync::Arc; + use tokio_rustls::{rustls, TlsConnector}; + + let tls_config = match tls_config.0 { + Some(tls_config) => tls_config, + None => { + // Load native root certificates + let mut root_store = rustls::RootCertStore::empty(); + match rustls_native_certs::load_native_certs() { + Ok(certs) => { + let roots: Vec = certs + .into_iter() + .map(|cert| { + rustls::pki_types::CertificateDer::from(cert.0) + }) + .collect(); + + for root in roots { + root_store.add(root).map_err(|e| { + WsError::Io(io::Error::new( + io::ErrorKind::Other, + format!( + "Failed to parse root certificate: {:?}", + e + ), + )) + })?; + } + } + Err(e) => { + return Err(WsError::Io(io::Error::new( + io::ErrorKind::Other, + format!( + "Could not load native root certificates: {}", + e + ), + )) + .into()); } } - Err(e) => { - return Err(WsError::Io(io::Error::new( - io::ErrorKind::Other, - format!("Could not load native root certificates: {}", e), - )) - .into()); - } - } - let tls_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); + rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth() + } + }; - let server_name = rustls::ServerName::try_from(host).map_err(|_| { + let server_name = rustls::pki_types::ServerName::try_from(host.to_string()) + .map_err(|_| { WsError::Io(io::Error::new( io::ErrorKind::InvalidInput, format!("Invalid DNS name: {}", host), )) })?; - let connector = TlsConnector::from(Arc::new(tls_config)); - let tls_stream = connector - .connect(server_name, proxy_stream) - .await - .map_err(|e| { - WsError::Io(io::Error::new( - io::ErrorKind::Other, - format!("TLS connection error: {}", e), - )) - })?; - - MaybeTlsStream::Rustls(tls_stream) - } + let connector = TlsConnector::from(Arc::new(tls_config)); + let tls_stream = + connector.connect(server_name, proxy_stream).await.map_err(|e| { + WsError::Io(io::Error::new( + io::ErrorKind::Other, + format!("TLS connection error: {}", e), + )) + })?; + + MaybeTlsStream::Rustls(tls_stream) + } + + #[cfg(not(feature = "rustls-tls-native-roots"))] + { + // For non-rustls-tls-native-roots builds, don't support proxy for WSS + return Err(WsError::Io(io::Error::new( + io::ErrorKind::Other, + "WSS over proxy requires rustls-tls-native-roots feature", + )) + .into()); + } + } else { + // For plain WS, just use the proxy stream directly + MaybeTlsStream::Plain(proxy_stream) + }; - #[cfg(not(feature = "rustls-tls-native-roots"))] - { - // For non-rustls-tls-native-roots builds, don't support proxy for WSS - return Err(WsError::Io(io::Error::new( - io::ErrorKind::Other, - "WSS over proxy requires rustls-tls-native-roots feature", - )) - .into()); + // Now perform WebSocket handshake over the established connection + let (ws_stream, _) = + tokio_tungstenite::client_async_with_config(request, stream, None).await?; + ws_stream + } else { + // No proxy specified, connect directly + #[cfg(feature = "rustls-tls-native-roots")] + { + let connector = match tls_config.0 { + Some(config) => { + Some(tokio_tungstenite::Connector::Rustls(std::sync::Arc::new(config))) } - } else { - // For plain WS, just use the proxy stream directly - MaybeTlsStream::Plain(proxy_stream) + None => None, }; - - // Now perform WebSocket handshake over the established connection - let (ws_stream, _) = - tokio_tungstenite::client_async_with_config(request, stream, None).await?; + let (ws_stream, _) = tokio_tungstenite::connect_async_tls_with_config( + request, None, false, connector, + ) + .await?; ws_stream - } else { - // No proxy specified, connect directly + } + + #[cfg(not(feature = "rustls-tls-native-roots"))] + { let (ws_stream, _) = connect_async(request).await?; ws_stream } - } else { - // Non-tokio build or no proxy - connect directly - let (ws_stream, _) = connect_async(request).await?; - ws_stream }; ws_stream @@ -359,7 +385,7 @@ impl SignalStream { InternalMessage::Signal { signal, response_chn } => { let data = proto::SignalRequest { message: Some(signal) }.encode_to_vec(); - if let Err(err) = ws_writer.send(Message::Binary(data)).await { + if let Err(err) = ws_writer.send(Message::Binary(data.into())).await { let _ = response_chn.send(Err(err.into())); break; } @@ -367,7 +393,7 @@ impl SignalStream { let _ = response_chn.send(Ok(())); } InternalMessage::Pong { ping_data } => { - if let Err(err) = ws_writer.send(Message::Pong(ping_data)).await { + if let Err(err) = ws_writer.send(Message::Pong(ping_data.into())).await { log::error!("failed to send pong message: {:?}", err); } } @@ -390,7 +416,7 @@ impl SignalStream { while let Some(msg) = ws_reader.next().await { match msg { Ok(Message::Binary(data)) => { - let res = proto::SignalResponse::decode(data.as_slice()) + let res = proto::SignalResponse::decode(data) .expect("failed to decode SignalResponse"); if let Some(msg) = res.message { @@ -398,7 +424,8 @@ impl SignalStream { } } Ok(Message::Ping(data)) => { - let _ = internal_tx.send(InternalMessage::Pong { ping_data: data }).await; + let _ = + internal_tx.send(InternalMessage::Pong { ping_data: data.to_vec() }).await; continue; } Ok(Message::Close(close)) => { diff --git a/livekit-ffi/Cargo.toml b/livekit-ffi/Cargo.toml index 6581dba5d..aef206a0f 100644 --- a/livekit-ffi/Cargo.toml +++ b/livekit-ffi/Cargo.toml @@ -47,7 +47,7 @@ prost-build = { workspace = true } webrtc-sys-build = { workspace = true } [dev-dependencies] -livekit-api = { workspace = true } +livekit-api = { workspace = true, features = ["webhooks"] } [lib] crate-type = ["lib", "staticlib", "cdylib"] diff --git a/livekit-uniffi/Cargo.toml b/livekit-uniffi/Cargo.toml index 8293512e9..64dd82fa6 100644 --- a/livekit-uniffi/Cargo.toml +++ b/livekit-uniffi/Cargo.toml @@ -13,7 +13,7 @@ publish = false [dependencies] livekit-protocol = { workspace = true } -livekit-api = { workspace = true } +livekit-api = { workspace = true, features = ["webhooks"] } uniffi = { version = "0.30.0", features = ["cli", "scaffolding-ffi-buffer-fns"] } log = { workspace = true } tokio = { workspace = true, features = ["sync"] } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index ffd383e56..ba237e882 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -22,6 +22,7 @@ use libwebrtc::{ rtp_transceiver::RtpTransceiver, RtcError, }; +pub use livekit_api::signal_client::TlsConfig; use livekit_api::signal_client::{SignalOptions, SignalSdkOptions, SIGNAL_CONNECT_TIMEOUT}; use livekit_protocol::observer::Dispatcher; use livekit_protocol::{self as proto, encryption}; @@ -374,6 +375,8 @@ pub struct RoomOptions { pub single_peer_connection: bool, /// Timeout for each individual signal connection attempt pub connect_timeout: Duration, + /// Custom TLS config + pub tls_config: TlsConfig, } impl Default for RoomOptions { @@ -396,6 +399,7 @@ impl Default for RoomOptions { sdk_options: RoomSdkOptions::default(), single_peer_connection: false, connect_timeout: SIGNAL_CONNECT_TIMEOUT, + tls_config: TlsConfig::default(), } } } @@ -491,6 +495,7 @@ impl Room { signal_options.adaptive_stream = options.adaptive_stream; signal_options.single_peer_connection = options.single_peer_connection; signal_options.connect_timeout = options.connect_timeout; + signal_options.tls_config = options.tls_config.clone(); let (rtc_engine, join_response, engine_events) = RtcEngine::connect( url, token, diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 995070ed4..cbd6699d4 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -638,7 +638,7 @@ impl EngineInner { /// When waiting for reconnection, it ensures we're always using the latest session. async fn wait_reconnection( &self, - ) -> EngineResult<(RwLockReadGuard, AsyncRwLockReadGuard<()>)> { + ) -> EngineResult<(RwLockReadGuard<'_, EngineHandle>, AsyncRwLockReadGuard<'_, ()>)> { let r_lock = self.reconnecting_lock.read().await; let running_handle = self.running_handle.read();