Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(av-cliper): concurrency download for hls-loader #140

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/av-cliper/src/data-loader/__tests__/hls-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@ test('hls loader default time', async () => {
expect(video?.timestamp).toBe(10e6);
video?.close();
});

test('hls loader async load m4s files', async () => {
const loader = await createHLSLoader(m3u8Url, 5);
const [{ actualStartTime, actualEndTime, stream }] = loader.load() ?? [];
expect(stream).toBeInstanceOf(ReadableStream);
expect([actualStartTime, Math.round(actualEndTime / 1e6)]).toEqual([0, 60]);

const clip = new MP4Clip(stream);
await clip.ready;
expect(Math.round(clip.meta.duration / 1e6)).toBe(
Math.round((actualEndTime - actualStartTime) / 1e6),
);

const { video } = await clip.tick(10e6);
expect(video?.timestamp).toBe(10e6);
video?.close();
});
63 changes: 55 additions & 8 deletions packages/av-cliper/src/data-loader/hls-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Parser } from 'm3u8-parser';
/**
* 创建一个 HLS 资源加载器
*/
export async function createHLSLoader(m3u8URL: string) {
export async function createHLSLoader(m3u8URL: string, concurrency = 10) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参数 concurrency = 10 移到内部,默认不对外开放,若有有需求再讨论开放。

可以简化 API 、降低破坏性变更几率。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要,不同业务方期望的并行数量应该是不一样的

const parser = new Parser();
parser.push(await (await fetch(m3u8URL)).text());
parser.end();
Expand All @@ -17,12 +17,64 @@ export async function createHLSLoader(m3u8URL: string) {
);
const base = new URL(m3u8URL, location.href);

const segmentBufferFetchqueue = {} as Record<string, Promise<ArrayBuffer>>;

async function downloadSegGroup() {
function createTaskQueue(concurrency: number) {
let running = 0;
const queue = [] as Function[];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换为 as Array<() => Promise<void>>


async function runTask(task: Function) {
queue.push(task);
next();
}

async function next() {
if (running < concurrency && queue.length) {
const task = queue.shift();
running++;
try {
if (task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换为 await task?.()

await task();
}
} catch (err) {
console.error(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换为 Log.error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

异常时需要中断流,否则流永远不会结束,且使用方无法感知到错误。

}
running--;
next();
}
}

return runTask;
}

async function fetchSegmentBufferPromise(url: string) {
return (await fetch(url)).arrayBuffer();
}

const runTask = createTaskQueue(concurrency);

for (const [, gData] of Object.entries(segGroup)) {
Copy link
Member

@hughfenghen hughfenghen Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

复用 load 中转换的数据 filterSegGroup,有些片段是无需下载的。

for (const [, item] of gData.entries()) {
const url = new URL(item.uri, base).href;
runTask(
() => (segmentBufferFetchqueue[url] = fetchSegmentBufferPromise(url)),
);
}
}
}

async function getSegmentBuffer(url: string) {
return segmentBufferFetchqueue[url].then((value) => value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换为 return await segmentBufferFetchqueue[url]

}

return {
/**
* 下载期望时间区间的分配数据,封装成流
* 每个分片包含一个时间段,实际下载的分片数据时长会略大于期望的时间区间
*/
load(expectStartTime = 0, expectEndTime = Infinity) {
downloadSegGroup();
const filterSegGroup = {} as Record<
string,
{
Expand Down Expand Up @@ -86,13 +138,8 @@ export async function createHLSLoader(m3u8URL: string) {
);
},
pull: async (ctrl) => {
ctrl.enqueue(
new Uint8Array(
await (
await fetch(new URL(segments[segIdx].uri, base).href)
).arrayBuffer(),
),
);
const url = new URL(segments[segIdx].uri, base).href;
ctrl.enqueue(new Uint8Array(await getSegmentBuffer(url)));
segIdx += 1;
if (segIdx >= segments.length) ctrl.close();
},
Expand Down