-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathstream.ts
76 lines (69 loc) · 1.79 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import { Readable } from 'node:stream'
interface FromWebOptions {
objectMode?: boolean
highWaterMark?: number
encoding?: BufferEncoding
signal?: AbortSignal
}
/**
* Code adapted from Node's stream.Readable.fromWeb(), because it has to run on Node@14
* @see https://github.com/nodejs/node/blob/bd462ad81bc30e547e52e699ee3b6fa3d7c882c9/lib/internal/webstreams/adapters.js#L458
*/
export function toToReadable(
webStream: ReadableStream,
options: FromWebOptions = {},
) {
const reader = webStream.getReader()
let closed = false
const { highWaterMark, encoding, objectMode = false, signal } = options
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
// @ts-ignore signal exist only since Node@17
signal,
read() {
reader.read().then(
(chunk: any) => {
if (chunk.done) {
readable.push(null)
} else {
readable.push(chunk.value)
}
},
(error: any) => readable.destroy(error),
)
},
destroy(error: any, callback: (arg0: any) => void) {
function done() {
try {
callback(error)
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error
})
}
}
if (!closed) {
reader.cancel(error).then(done, done)
return
}
done()
},
})
reader.closed.then(
() => {
closed = true
},
(error: any) => {
closed = true
readable.destroy(error)
},
)
return readable
}