-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathdecode-stream.test.ts
160 lines (142 loc) · 4.4 KB
/
decode-stream.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import {EventEmitter} from "stream"
import {decodeStream} from "./decode-stream"
import {StreamConnection} from "@onflow/typedefs"
import * as decodeResponseModule from "./decode"
describe("decode stream", () => {
let mockStream: StreamConnection<{data: any}>
let emitter: EventEmitter
let mockDecodeResponse
beforeEach(() => {
mockDecodeResponse = jest.fn()
jest
.spyOn(decodeResponseModule, "decodeResponse")
.mockImplementation(mockDecodeResponse)
emitter = new EventEmitter()
mockStream = {
on: jest.fn((event, callback) => {
emitter.on(event, callback)
}) as any,
off: jest.fn((event, callback) => {
emitter.off(event, callback)
}) as any,
close: jest.fn(),
}
})
test("data is mapped to decoded data per channel for non-null values", async () => {
const originalData = {
dummy: {foo: "bar"},
other: {foo: "baz"},
nullExample: null,
}
const decodedData = {
dummy: {foo2: "bar2"},
other: {foo2: "baz2"},
}
mockDecodeResponse.mockImplementation(response => {
if (response.dummy) {
return decodedData.dummy
} else if (response.other) {
return decodedData.other
} else {
throw new Error("unexpected response")
}
})
const customDecoders = {
foo: jest.fn(),
bar: jest.fn(),
}
const decodedStream = decodeStream(
mockStream,
mockDecodeResponse,
customDecoders
)
const dummyCallback = jest.fn(data => {
expect(data).toEqual(decodedData.dummy)
})
const otherCallback = jest.fn(data => {
expect(data).toEqual(decodedData.other)
})
const nullCallback = jest.fn()
decodedStream.on("dummy", dummyCallback)
decodedStream.on("other", otherCallback)
decodedStream.on("nullExample", nullCallback)
emitter.emit("data", originalData)
// wait for next tick
await new Promise(resolve => setTimeout(resolve, 0))
expect(mockDecodeResponse).toHaveBeenNthCalledWith(
1,
{
dummy: {foo: "bar"},
},
customDecoders
)
expect(mockDecodeResponse).toHaveBeenNthCalledWith(
2,
{
other: {foo: "baz"},
},
customDecoders
)
expect(dummyCallback).toHaveBeenCalled()
expect(otherCallback).toHaveBeenCalled()
expect(nullCallback).not.toHaveBeenCalled()
})
test("data is emitted in order", async () => {
const incomingData = [{foo: "one"}, {bar: "two"}]
mockDecodeResponse.mockImplementation(async response => {
if (response.foo) {
await new Promise(resolve => setTimeout(resolve, 100))
return response.foo
} else if (response.bar) {
return response.bar
} else {
throw new Error("unexpected response")
}
})
const decodedStream = decodeStream(mockStream, mockDecodeResponse)
const cb = jest.fn()
decodedStream.on("foo", msg => {
cb("foo", msg)
})
decodedStream.on("bar", msg => {
cb("bar", msg)
})
emitter.emit("data", incomingData[0])
emitter.emit("data", incomingData[1])
// Wait for data to be processed
await new Promise(resolve => setTimeout(resolve, 200))
expect(cb).toHaveBeenNthCalledWith(1, "foo", "one")
expect(cb).toHaveBeenNthCalledWith(2, "bar", "two")
})
test("each channel is emitted in order", async () => {
const decodedStream = decodeStream(mockStream, mockDecodeResponse)
// Data will take time to decode but must arrive before error/close
mockDecodeResponse.mockImplementation(async response => {
await new Promise(resolve => setTimeout(resolve, 100))
return response
})
const cb = jest.fn()
decodedStream.on("foo", msg => {
cb("foo", msg)
})
decodedStream.on("bar", msg => {
cb("bar", msg)
})
decodedStream.on("error", err => {
cb("error", err)
})
decodedStream.on("close", () => {
cb("close")
})
emitter.emit("data", {foo: "one"})
emitter.emit("error", new Error("error"))
emitter.emit("data", {bar: "two"})
emitter.emit("close")
// Wait for data to be processed
await new Promise(resolve => setTimeout(resolve, 250))
expect(cb).toHaveBeenNthCalledWith(1, "foo", {foo: "one"})
expect(cb).toHaveBeenNthCalledWith(2, "error", new Error("error"))
expect(cb).toHaveBeenNthCalledWith(3, "bar", {bar: "two"})
expect(cb).toHaveBeenNthCalledWith(4, "close")
})
})