Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/long-keys-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Cancel losing timeout in AudioMixer race to prevent orphaned timers
102 changes: 102 additions & 0 deletions packages/livekit-rtc/src/audio_mixer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,106 @@ describe('AudioMixer', () => {
// Should get at least 2 frames (stream exhausts after 2)
expect(frames.length).toBeGreaterThanOrEqual(2);
});

it('completes mixing without lingering timers when iterator is fast', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Long timeout so the iterator always wins the race.
// Before the fix, each iteration leaked a 5s timer; with the fix,
// cancel() clears it immediately so the mixer shuts down without delay.
streamTimeoutMs: 5000,
});

const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 2) break;
}

await mixer.aclose();

expect(frames.length).toBe(2);
// Verify the frames contain the expected mixed value
for (const frame of frames) {
expect(frame.data[0]).toBe(42);
}
});

it('produces frames even with many race iterations', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
streamTimeoutMs: 5000,
});

// Use more frames to stress multiple race iterations
const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 4) break;
}

await mixer.aclose();

expect(frames.length).toBe(4);
// All frames should contain the expected value
for (const frame of frames) {
expect(frame.data[0]).toBe(10);
}
});

it('handles slow streams via timeout path', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Very short timeout to trigger the timeout path
streamTimeoutMs: 1,
});

// Create a stream that is slower than the timeout
async function* slowStream(): AsyncGenerator<AudioFrame> {
await new Promise((resolve) => setTimeout(resolve, 200));
const data = new Int16Array(numChannels * samplesPerChannel).fill(500);
yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel);
}

// Suppress the expected console.warn from the timeout path
const originalWarn = console.warn;
const warnings: string[] = [];
console.warn = (...args: unknown[]) => {
warnings.push(args.map(String).join(' '));
};

try {
mixer.addStream(slowStream());

// The mixer should produce a frame (zero-padded due to timeout)
// and auto-close when the stream exhausts.
const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 1) break;
}

await mixer.aclose();

// The timeout warning should have been logged
expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true);
} finally {
console.warn = originalWarn;
}
});
});
12 changes: 9 additions & 3 deletions packages/livekit-rtc/src/audio_mixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export class AudioMixer {
// Accumulate data until we have at least chunkSize samples
while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) {
try {
const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]);
const result = await this.timeoutRace(iterator.next(), this.streamTimeoutMs);

if (result === 'timeout') {
console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`);
Expand Down Expand Up @@ -412,7 +412,13 @@ export class AudioMixer {
return new Promise((resolve) => setTimeout(resolve, ms));
}

private timeout(ms: number): Promise<'timeout'> {
return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms));
/** Race a promise against a timeout. The losing setTimeout is automatically
* cleared via `.finally()` so callers don't need to manage cleanup. */
private timeoutRace<T>(promise: Promise<T>, ms: number): Promise<T | 'timeout'> {
let timer: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<'timeout'>((resolve) => {
timer = setTimeout(() => resolve('timeout'), ms);
});
return Promise.race([promise.finally(() => clearTimeout(timer)), timeoutPromise]);
}
}
Loading