-
Notifications
You must be signed in to change notification settings - Fork 176
/
Copy pathVideoStream.c
417 lines (353 loc) · 14.6 KB
/
VideoStream.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
#include "Limelight-internal.h"
#define FIRST_FRAME_MAX 1500
#define FIRST_FRAME_TIMEOUT_SEC 10
#define FIRST_FRAME_PORT 47996
static RTP_VIDEO_QUEUE rtpQueue;
static SOCKET rtpSocket = INVALID_SOCKET;
static SOCKET firstFrameSocket = INVALID_SOCKET;
static PPLT_CRYPTO_CONTEXT decryptionCtx;
static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
static bool receivedDataFromPeer;
static uint64_t firstDataTimeMs;
static bool receivedFullFrame;
// We can't request an IDR frame until the depacketizer knows
// that a packet was lost. This timeout bounds the time that
// the RTP queue will wait for missing/reordered packets.
#define RTP_QUEUE_DELAY 10
// This is the desired number of video packets that can be
// stored in the socket's receive buffer. 2048 is chosen
// because it should be large enough for all reasonable
// frame sizes (probably 2 or 3 frames) without using too
// much kernel memory with larger packet sizes. It also
// can smooth over transient pauses in network traffic
// and subsequent packet/frame bursts that follow.
#define RTP_RECV_PACKETS_BUFFERED 2048
// Initialize the video stream
void initializeVideoStream(void) {
initializeVideoDepacketizer(StreamConfig.packetSize);
RtpvInitializeQueue(&rtpQueue);
decryptionCtx = PltCreateCryptoContext();
receivedDataFromPeer = false;
firstDataTimeMs = 0;
receivedFullFrame = false;
}
// Clean up the video stream
void destroyVideoStream(void) {
PltDestroyCryptoContext(decryptionCtx);
destroyVideoDepacketizer();
RtpvCleanupQueue(&rtpQueue);
}
// UDP Ping proc
static void VideoPingThreadProc(void* context) {
char legacyPingData[] = { 0x50, 0x49, 0x4E, 0x47 };
LC_SOCKADDR saddr;
LC_ASSERT(VideoPortNumber != 0);
memcpy(&saddr, &RemoteAddr, sizeof(saddr));
SET_PORT(&saddr, VideoPortNumber);
// We do not check for errors here. Socket errors will be handled
// on the read-side in ReceiveThreadProc(). This avoids potential
// issues related to receiving ICMP port unreachable messages due
// to sending a packet prior to the host PC binding to that port.
int pingCount = 0;
while (!PltIsThreadInterrupted(&udpPingThread)) {
if (VideoPingPayload.payload[0] != 0) {
pingCount++;
VideoPingPayload.sequenceNumber = BE32(pingCount);
sendto(rtpSocket, (char*)&VideoPingPayload, sizeof(VideoPingPayload), 0, (struct sockaddr*)&saddr, AddrLen);
}
else {
sendto(rtpSocket, legacyPingData, sizeof(legacyPingData), 0, (struct sockaddr*)&saddr, AddrLen);
}
PltSleepMsInterruptible(&udpPingThread, 500);
}
}
// Receive thread proc
static void VideoReceiveThreadProc(void* context) {
int err;
int bufferSize, receiveSize, decryptedSize, minSize;
char* buffer;
char* encryptedBuffer;
int queueStatus;
bool useSelect;
int waitingForVideoMs;
bool encrypted;
encrypted = !!(EncryptionFeaturesEnabled & SS_ENC_VIDEO);
decryptedSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE;
minSize = sizeof(RTP_PACKET) + ((EncryptionFeaturesEnabled & SS_ENC_VIDEO) ? sizeof(ENC_VIDEO_HEADER) : 0);
receiveSize = decryptedSize + ((EncryptionFeaturesEnabled & SS_ENC_VIDEO) ? sizeof(ENC_VIDEO_HEADER) : 0);
bufferSize = decryptedSize + sizeof(RTPV_QUEUE_ENTRY);
buffer = NULL;
if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) {
// SO_RCVTIMEO failed, so use select() to wait
useSelect = true;
}
else {
// SO_RCVTIMEO timeout set for recv()
useSelect = false;
}
// Allocate a staging buffer to use for each received packet
if (encrypted) {
encryptedBuffer = (char*)malloc(receiveSize);
if (encryptedBuffer == NULL) {
Limelog("Video Receive: malloc() failed\n");
ListenerCallbacks.connectionTerminated(-1);
return;
}
}
else {
encryptedBuffer = NULL;
}
waitingForVideoMs = 0;
while (!PltIsThreadInterrupted(&receiveThread)) {
PRTP_PACKET packet;
if (buffer == NULL) {
buffer = (char*)malloc(bufferSize);
if (buffer == NULL) {
Limelog("Video Receive: malloc() failed\n");
ListenerCallbacks.connectionTerminated(-1);
break;
}
}
err = recvUdpSocket(rtpSocket,
encrypted ? encryptedBuffer : buffer,
receiveSize,
useSelect);
if (err < 0) {
Limelog("Video Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
break;
}
else if (err == 0) {
if (!receivedDataFromPeer) {
// If we wait many seconds without ever receiving a video packet,
// assume something is broken and terminate the connection.
waitingForVideoMs += UDP_RECV_POLL_TIMEOUT_MS;
if (waitingForVideoMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) {
Limelog("Terminating connection due to lack of video traffic\n");
ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_TRAFFIC);
break;
}
}
// Receive timed out; try again
continue;
}
if (!receivedDataFromPeer) {
receivedDataFromPeer = true;
Limelog("Received first video packet after %d ms\n", waitingForVideoMs);
firstDataTimeMs = PltGetMillis();
}
#ifndef LC_FUZZING
if (!receivedFullFrame) {
uint64_t now = PltGetMillis();
if (now - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) {
Limelog("Terminating connection due to lack of a successful video frame\n");
ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_FRAME);
break;
}
}
#endif
if (err < minSize) {
// Runt packet
continue;
}
// Decrypt the packet into the buffer if encryption is enabled
if (encrypted) {
PENC_VIDEO_HEADER encHeader = (PENC_VIDEO_HEADER)encryptedBuffer;
// If this frame is below our current frame number, discard it before decryption
// to save CPU cycles decrypting FEC shards for a frame we already reassembled.
//
// Since this is happening _before_ decryption, this packet is not trusted yet.
// It's imperative that we do not mutate any state based on this packet until
// after it has been decrypted successfully!
//
// It's possible for an attacker to inject a fake packet that has any value of
// header fields they want, however this provides them no benefit because we will
// simply drop said packet here (if it's below the current frame number) or it
// will pass this check and be dropped during decryption (if contents is tampered)
// or after decryption in the RTP queue (if it's a replay of a previous authentic
// packet from the host).
//
// In short, an attacker spoofing this value via MITM or sending malicious values
// impersonating the host from off-link doesn't gain them anything. If they have
// a true MITM, they can DoS our connection by just dropping all our traffic, so
// tampering with packets to fail this check doesn't accomplish anything they
// couldn't already do. If they're not on-link, we just throw their malicious
// traffic away (as mentioned in the paragraph above) and continue accepting
// legitmate video traffic.
if (encHeader->frameNumber && LE32(encHeader->frameNumber) < RtpvGetCurrentFrameNumber(&rtpQueue)) {
continue;
}
if (!PltDecryptMessage(decryptionCtx, ALGORITHM_AES_GCM, 0,
(unsigned char*)StreamConfig.remoteInputAesKey, sizeof(StreamConfig.remoteInputAesKey),
encHeader->iv, sizeof(encHeader->iv),
encHeader->tag, sizeof(encHeader->tag),
((unsigned char*)(encHeader + 1)), err - sizeof(ENC_VIDEO_HEADER), // The ciphertext is after the header
(unsigned char*)buffer, &err)) {
Limelog("Failed to decrypt video packet!\n");
continue;
}
}
// Convert fields to host byte-order
packet = (PRTP_PACKET)&buffer[0];
packet->sequenceNumber = BE16(packet->sequenceNumber);
packet->timestamp = BE32(packet->timestamp);
packet->ssrc = BE32(packet->ssrc);
queueStatus = RtpvAddPacket(&rtpQueue, packet, err, (PRTPV_QUEUE_ENTRY)&buffer[decryptedSize]);
if (queueStatus == RTPF_RET_QUEUED) {
// The queue owns the buffer
buffer = NULL;
}
}
if (buffer != NULL) {
free(buffer);
}
if (encryptedBuffer != NULL) {
free(encryptedBuffer);
}
}
void notifyKeyFrameReceived(void) {
// Remember that we got a full frame successfully
receivedFullFrame = true;
}
// Decoder thread proc
static void VideoDecoderThreadProc(void* context) {
while (!PltIsThreadInterrupted(&decoderThread)) {
VIDEO_FRAME_HANDLE frameHandle;
PDECODE_UNIT decodeUnit;
if (!LiWaitForNextVideoFrame(&frameHandle, &decodeUnit)) {
return;
}
LiCompleteVideoFrame(frameHandle, VideoCallbacks.submitDecodeUnit(decodeUnit));
}
}
// Read the first frame of the video stream
int readFirstFrame(void) {
// All that matters is that we close this socket.
// This starts the flow of video on Gen 3 servers.
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
return 0;
}
// Terminate the video stream
void stopVideoStream(void) {
if (!receivedDataFromPeer) {
Limelog("No video traffic was ever received from the host!\n");
}
VideoCallbacks.stop();
// Wake up client code that may be waiting on the decode unit queue
stopVideoDepacketizer();
PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
shutdownTcpSocket(firstFrameSocket);
}
PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
if (rtpSocket != INVALID_SOCKET) {
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
VideoCallbacks.cleanup();
}
// Start the video stream
int startVideoStream(void* rendererContext, int drFlags) {
int err;
firstFrameSocket = INVALID_SOCKET;
// This must be called before the decoder thread starts submitting
// decode units
LC_ASSERT(NegotiatedVideoFormat != 0);
err = VideoCallbacks.setup(NegotiatedVideoFormat, StreamConfig.width,
StreamConfig.height, StreamConfig.fps, rendererContext, drFlags);
if (err != 0) {
return err;
}
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, &LocalAddr, AddrLen,
RTP_RECV_PACKETS_BUFFERED * (StreamConfig.packetSize + MAX_RTP_HEADER_SIZE),
SOCK_QOS_TYPE_VIDEO);
if (rtpSocket == INVALID_SOCKET) {
VideoCallbacks.cleanup();
return LastSocketError();
}
VideoCallbacks.start();
err = PltCreateThread("VideoRecv", VideoReceiveThreadProc, NULL, &receiveThread);
if (err != 0) {
VideoCallbacks.stop();
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return err;
}
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
err = PltCreateThread("VideoDec", VideoDecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
VideoCallbacks.stop();
PltInterruptThread(&receiveThread);
PltJoinThread(&receiveThread);
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return err;
}
}
if (AppVersionQuad[0] == 3) {
// Connect this socket to open port 47998 for our ping thread
firstFrameSocket = connectTcpSocket(&RemoteAddr, AddrLen,
FIRST_FRAME_PORT, FIRST_FRAME_TIMEOUT_SEC);
if (firstFrameSocket == INVALID_SOCKET) {
VideoCallbacks.stop();
stopVideoDepacketizer();
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return LastSocketError();
}
}
// Start pinging before reading the first frame so GFE knows where
// to send UDP data
err = PltCreateThread("VideoPing", VideoPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
VideoCallbacks.stop();
stopVideoDepacketizer();
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
closeSocket(rtpSocket);
if (firstFrameSocket != INVALID_SOCKET) {
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
VideoCallbacks.cleanup();
return err;
}
if (AppVersionQuad[0] == 3) {
// Read the first frame to start the flow of video
err = readFirstFrame();
if (err != 0) {
stopVideoStream();
return err;
}
}
return 0;
}