From 468f9877733b0c7f60794c2261170fe824556cd3 Mon Sep 17 00:00:00 2001 From: PBDm Date: Wed, 19 Jun 2024 19:50:40 +0800 Subject: [PATCH 1/5] feat(av-cliper): concurrency download for hls-loader --- .../data-loader/__tests__/hls-loader.test.ts | 17 +++++ .../av-cliper/src/data-loader/hls-loader.ts | 63 ++++++++++++++++--- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts index d6a4fd07..e4fe9507 100644 --- a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts +++ b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts @@ -41,3 +41,20 @@ test('hls loader default time', async () => { expect(video?.timestamp).toBe(10e6); video?.close(); }); + +test('hls loader async load m4s files', async () => { + const loader = await createHLSLoader(m3u8Url, 5); + const [{ actualStartTime, actualEndTime, stream }] = loader.load() ?? []; + expect(stream).toBeInstanceOf(ReadableStream); + expect([actualStartTime, Math.round(actualEndTime / 1e6)]).toEqual([0, 60]); + + const clip = new MP4Clip(stream); + await clip.ready; + expect(Math.round(clip.meta.duration / 1e6)).toBe( + Math.round((actualEndTime - actualStartTime) / 1e6), + ); + + const { video } = await clip.tick(10e6); + expect(video?.timestamp).toBe(10e6); + video?.close(); +}); diff --git a/packages/av-cliper/src/data-loader/hls-loader.ts b/packages/av-cliper/src/data-loader/hls-loader.ts index a72bdf80..704aacb1 100644 --- a/packages/av-cliper/src/data-loader/hls-loader.ts +++ b/packages/av-cliper/src/data-loader/hls-loader.ts @@ -3,7 +3,7 @@ import { Parser } from 'm3u8-parser'; /** * 创建一个 HLS 资源加载器 */ -export async function createHLSLoader(m3u8URL: string) { +export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const parser = new Parser(); parser.push(await (await fetch(m3u8URL)).text()); parser.end(); @@ -17,12 +17,64 @@ export async function createHLSLoader(m3u8URL: string) { ); const base = new URL(m3u8URL, location.href); + const segmentBufferFetchqueue = {} as Record>; + + async function downloadSegGroup() { + function createTaskQueue(concurrency: number) { + let running = 0; + const queue = [] as Function[]; + + async function runTask(task: Function) { + queue.push(task); + next(); + } + + async function next() { + if (running < concurrency && queue.length) { + const task = queue.shift(); + running++; + try { + if (task) { + await task(); + } + } catch (err) { + console.error(err); + } + running--; + next(); + } + } + + return runTask; + } + + async function fetchSegmentBufferPromise(url: string) { + return (await fetch(url)).arrayBuffer(); + } + + const runTask = createTaskQueue(concurrency); + + for (const [, gData] of Object.entries(segGroup)) { + for (const [, item] of gData.entries()) { + const url = new URL(item.uri, base).href; + runTask( + () => (segmentBufferFetchqueue[url] = fetchSegmentBufferPromise(url)), + ); + } + } + } + + async function getSegmentBuffer(url: string) { + return segmentBufferFetchqueue[url].then((value) => value); + } + return { /** * 下载期望时间区间的分配数据,封装成流 * 每个分片包含一个时间段,实际下载的分片数据时长会略大于期望的时间区间 */ load(expectStartTime = 0, expectEndTime = Infinity) { + downloadSegGroup(); const filterSegGroup = {} as Record< string, { @@ -86,13 +138,8 @@ export async function createHLSLoader(m3u8URL: string) { ); }, pull: async (ctrl) => { - ctrl.enqueue( - new Uint8Array( - await ( - await fetch(new URL(segments[segIdx].uri, base).href) - ).arrayBuffer(), - ), - ); + const url = new URL(segments[segIdx].uri, base).href; + ctrl.enqueue(new Uint8Array(await getSegmentBuffer(url))); segIdx += 1; if (segIdx >= segments.length) ctrl.close(); }, From ee7f4249fe867f34827e1087df4a62bd00709f7a Mon Sep 17 00:00:00 2001 From: PBDm Date: Thu, 20 Jun 2024 14:33:32 +0800 Subject: [PATCH 2/5] feat(av-cliper): concurrency download for hls-loader --- .../av-cliper/src/data-loader/hls-loader.ts | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/packages/av-cliper/src/data-loader/hls-loader.ts b/packages/av-cliper/src/data-loader/hls-loader.ts index 704aacb1..a22fd03a 100644 --- a/packages/av-cliper/src/data-loader/hls-loader.ts +++ b/packages/av-cliper/src/data-loader/hls-loader.ts @@ -1,4 +1,5 @@ import { Parser } from 'm3u8-parser'; +import { Log } from '../log'; /** * 创建一个 HLS 资源加载器 @@ -19,12 +20,22 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const segmentBufferFetchqueue = {} as Record>; - async function downloadSegGroup() { + async function downloadSegGroup( + filterSegGroup: Record< + string, + { + actualStartTime: number; + actualEndTime: number; + segments: Parser['manifest']['segments']; + } + >, + ctrl: ReadableStreamDefaultController, + ) { function createTaskQueue(concurrency: number) { let running = 0; - const queue = [] as Function[]; + const queue = [] as Array<() => Promise>; - async function runTask(task: Function) { + async function runTask(task: () => Promise) { queue.push(task); next(); } @@ -34,14 +45,14 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const task = queue.shift(); running++; try { - if (task) { - await task(); - } + await task?.(); + next(); } catch (err) { - console.error(err); + // 异常时中断流 + ctrl.close(); + Log.error(err); } running--; - next(); } } @@ -54,8 +65,8 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const runTask = createTaskQueue(concurrency); - for (const [, gData] of Object.entries(segGroup)) { - for (const [, item] of gData.entries()) { + for (const [, gData] of Object.entries(filterSegGroup)) { + for (const [, item] of gData.segments.entries()) { const url = new URL(item.uri, base).href; runTask( () => (segmentBufferFetchqueue[url] = fetchSegmentBufferPromise(url)), @@ -65,7 +76,7 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { } async function getSegmentBuffer(url: string) { - return segmentBufferFetchqueue[url].then((value) => value); + return segmentBufferFetchqueue[url]; } return { @@ -74,7 +85,6 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { * 每个分片包含一个时间段,实际下载的分片数据时长会略大于期望的时间区间 */ load(expectStartTime = 0, expectEndTime = Infinity) { - downloadSegGroup(); const filterSegGroup = {} as Record< string, { @@ -129,6 +139,7 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { actualEndTime, stream: new ReadableStream({ start: async (ctrl) => { + downloadSegGroup(filterSegGroup, ctrl); ctrl.enqueue( new Uint8Array( await ( From e790dd27dc73ec59fdaffe8af296d272c64b8418 Mon Sep 17 00:00:00 2001 From: PBDm Date: Thu, 20 Jun 2024 16:24:21 +0800 Subject: [PATCH 3/5] feat(av-cliper): concurrency download for hls-loader --- .../data-loader/__tests__/hls-loader.test.ts | 15 ++++++++- .../av-cliper/src/data-loader/hls-loader.ts | 33 ++++++++----------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts index e4fe9507..7a4f87bc 100644 --- a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts +++ b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts @@ -1,4 +1,4 @@ -import { expect, test } from 'vitest'; +import { expect, test, vi } from 'vitest'; import { MP4Clip } from '../../clips/mp4-clip'; import { createHLSLoader } from '../hls-loader'; @@ -58,3 +58,16 @@ test('hls loader async load m4s files', async () => { expect(video?.timestamp).toBe(10e6); video?.close(); }); + +test('hla loader async load m4s files with error stop correctly', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch'); + try { + const loader = await createHLSLoader(m3u8Url, 4); + fetchSpy.mockRejectedValueOnce(new Error('fetch error')); + loader.load() ?? []; + } catch (e: any) { + expect(e.message).toBe('fetch error'); + } + expect(fetchSpy).toHaveBeenCalledTimes(6); + fetchSpy.mockRestore(); +}); diff --git a/packages/av-cliper/src/data-loader/hls-loader.ts b/packages/av-cliper/src/data-loader/hls-loader.ts index a22fd03a..8021131e 100644 --- a/packages/av-cliper/src/data-loader/hls-loader.ts +++ b/packages/av-cliper/src/data-loader/hls-loader.ts @@ -20,15 +20,8 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const segmentBufferFetchqueue = {} as Record>; - async function downloadSegGroup( - filterSegGroup: Record< - string, - { - actualStartTime: number; - actualEndTime: number; - segments: Parser['manifest']['segments']; - } - >, + async function downloadSegments( + segments: Parser['manifest']['segments'], ctrl: ReadableStreamDefaultController, ) { function createTaskQueue(concurrency: number) { @@ -48,8 +41,8 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { await task?.(); next(); } catch (err) { - // 异常时中断流 - ctrl.close(); + queue.length = 0; + ctrl.error(err); Log.error(err); } running--; @@ -65,18 +58,18 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { const runTask = createTaskQueue(concurrency); - for (const [, gData] of Object.entries(filterSegGroup)) { - for (const [, item] of gData.segments.entries()) { - const url = new URL(item.uri, base).href; - runTask( - () => (segmentBufferFetchqueue[url] = fetchSegmentBufferPromise(url)), - ); - } + for (const [, item] of segments.entries()) { + const url = new URL(item.uri, base).href; + runTask( + () => (segmentBufferFetchqueue[url] = fetchSegmentBufferPromise(url)), + ); } } async function getSegmentBuffer(url: string) { - return segmentBufferFetchqueue[url]; + const segmentBuffer = await segmentBufferFetchqueue[url]; + delete segmentBufferFetchqueue[url]; + return segmentBuffer; } return { @@ -139,7 +132,7 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { actualEndTime, stream: new ReadableStream({ start: async (ctrl) => { - downloadSegGroup(filterSegGroup, ctrl); + downloadSegments(segments, ctrl); ctrl.enqueue( new Uint8Array( await ( From a820688bb5c2171b8b82d28172071c8e56ea8d6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E7=97=95?= Date: Thu, 20 Jun 2024 19:23:50 +0800 Subject: [PATCH 4/5] typo --- packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts index 7a4f87bc..417bfd1c 100644 --- a/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts +++ b/packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts @@ -59,7 +59,7 @@ test('hls loader async load m4s files', async () => { video?.close(); }); -test('hla loader async load m4s files with error stop correctly', async () => { +test('hls loader async load m4s files with error stop correctly', async () => { const fetchSpy = vi.spyOn(globalThis, 'fetch'); try { const loader = await createHLSLoader(m3u8Url, 4); From f58342b0135a8a4255ecb447192315a2947fb4d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E7=97=95?= Date: Thu, 20 Jun 2024 19:25:01 +0800 Subject: [PATCH 5/5] clear task queue --- packages/av-cliper/src/data-loader/hls-loader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/av-cliper/src/data-loader/hls-loader.ts b/packages/av-cliper/src/data-loader/hls-loader.ts index 8021131e..a61fee4e 100644 --- a/packages/av-cliper/src/data-loader/hls-loader.ts +++ b/packages/av-cliper/src/data-loader/hls-loader.ts @@ -41,7 +41,7 @@ export async function createHLSLoader(m3u8URL: string, concurrency = 10) { await task?.(); next(); } catch (err) { - queue.length = 0; + queue = []; ctrl.error(err); Log.error(err); }