diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 8dcf6be33..48186bc29 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -812,10 +812,10 @@ export class Http2ServerCallStream< let pushedEnd = false; - const maybePushEnd = () => { + const maybePushEnd = async () => { if (!pushedEnd && readsDone && !pendingMessageProcessing) { pushedEnd = true; - this.pushOrBufferMessage(readable, null); + await this.pushOrBufferMessage(readable, null); } }; @@ -848,16 +848,16 @@ export class Http2ServerCallStream< // Just return early if (!decompressedMessage) return; - this.pushOrBufferMessage(readable, decompressedMessage); + await this.pushOrBufferMessage(readable, decompressedMessage); } pendingMessageProcessing = false; this.stream.resume(); - maybePushEnd(); + await maybePushEnd(); }); - this.stream.once('end', () => { + this.stream.once('end', async () => { readsDone = true; - maybePushEnd(); + await maybePushEnd(); }); } @@ -881,16 +881,16 @@ export class Http2ServerCallStream< return this.canPush; } - private pushOrBufferMessage( + private async pushOrBufferMessage( readable: | ServerReadableStream | ServerDuplexStream, messageBytes: Buffer | null - ): void { + ): Promise { if (this.isPushPending) { this.bufferedMessages.push(messageBytes); } else { - this.pushMessage(readable, messageBytes); + await this.pushMessage(readable, messageBytes); } } @@ -943,7 +943,7 @@ export class Http2ServerCallStream< this.isPushPending = false; if (this.bufferedMessages.length > 0) { - this.pushMessage( + await this.pushMessage( readable, this.bufferedMessages.shift() as Buffer | null );