Skip to content

Commit

Permalink
Discard excess read bytes in LengthPrefixedMessageReader. (grpc#781)
Browse files Browse the repository at this point in the history
It is possible, in some use-cases, for LengthPrefixedMessageReader to
build up a buffer with loads of "free" space at the front that is not
usable, but that cannot be freed due to incoming messages arriving
sufficiently quickly.

We should, from time to time, try to discard those messages by
compacting the buffer. It's hard to have a good heuristic here, so this
patch represents a first-pass: if there's more than 1kB of usable space
in the buffer that we could free up with compaction, and if that space
is larger than the usable bytes in the buffer, we should try to obtain
it.

This effectively doubles the size of the buffer "for free", which is
what a resizing would do anyway. It also makes a subsequent resizing
cheaper, as we no longer need to copy the leading bytes to preserve the
reader index.
  • Loading branch information
Lukasa authored May 4, 2020
1 parent a542b16 commit d975125
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
17 changes: 16 additions & 1 deletion Sources/GRPC/LengthPrefixedMessageReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ internal struct LengthPrefixedMessageReader {
return self.buffer.map { $0.readableBytes } ?? 0
}

/// Returns the number of bytes that have been consumed and not discarded.
internal var _consumedNonDiscardedBytes: Int {
return self.buffer.map { $0.readerIndex } ?? 0
}

/// Whether the reader is mid-way through reading a message.
internal var isReading: Bool {
switch self.state {
Expand Down Expand Up @@ -127,8 +132,18 @@ internal struct LengthPrefixedMessageReader {
///
/// This allows the next call to `append` to avoid writing the contents of the appended buffer.
private mutating func nilBufferIfPossible() {
if self.buffer?.readableBytes == 0 {
let readableBytes = self.buffer?.readableBytes ?? 0
let readerIndex = self.buffer?.readerIndex ?? 0
let capacity = self.buffer?.capacity ?? 0

if readableBytes == 0 {
self.buffer = nil
} else if readerIndex > 1024 && readerIndex > (capacity / 2) {
// A rough-heuristic: if there is a kilobyte of read data, and there is more data that
// has been read than there is space in the rest of the buffer, we'll try to discard some
// read bytes here. We're trying to avoid doing this if there is loads of writable bytes that
// we'll have to shift.
self.buffer?.discardReadBytes()
}
}

Expand Down
24 changes: 24 additions & 0 deletions Tests/GRPCTests/LengthPrefixedMessageReaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,28 @@ class LengthPrefixedMessageReaderTests: GRPCTestCase {

XCTAssertEqual(0, buffer.readableBytes)
}

func testExcessiveBytesAreDiscarded() throws {
// We're going to use a 1kB message here for ease of testing.
let message = Array(repeating: UInt8(0), count: 1024)
let largeMessage: [UInt8] = [
0x00, // 1-byte compression flag
0x00, 0x00, 0x04, 0x00, // 4-byte message length (1024)
] + message
var buffer = byteBuffer(withBytes: largeMessage)
buffer.writeBytes(largeMessage)
buffer.writeBytes(largeMessage)
reader.append(buffer: &buffer)

XCTAssertEqual(reader.unprocessedBytes, (1024 + 5) * 3)
XCTAssertEqual(reader._consumedNonDiscardedBytes, 0)

self.assertMessagesEqual(expected: message, actual: try reader.nextMessage())
XCTAssertEqual(reader.unprocessedBytes, (1024 + 5) * 2)
XCTAssertEqual(reader._consumedNonDiscardedBytes, 1024 + 5)

self.assertMessagesEqual(expected: message, actual: try reader.nextMessage())
XCTAssertEqual(reader.unprocessedBytes, 1024 + 5)
XCTAssertEqual(reader._consumedNonDiscardedBytes, 0)
}
}
1 change: 1 addition & 0 deletions Tests/GRPCTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ extension LengthPrefixedMessageReaderTests {
// to regenerate.
static let __allTests__LengthPrefixedMessageReaderTests = [
("testAppendReadsAllBytes", testAppendReadsAllBytes),
("testExcessiveBytesAreDiscarded", testExcessiveBytesAreDiscarded),
("testNextMessageDeliveredAcrossMultipleByteBuffers", testNextMessageDeliveredAcrossMultipleByteBuffers),
("testNextMessageDoesNotThrowWhenCompressionFlagIsExpectedButNotSet", testNextMessageDoesNotThrowWhenCompressionFlagIsExpectedButNotSet),
("testNextMessageReturnsMessageForZeroLengthMessage", testNextMessageReturnsMessageForZeroLengthMessage),
Expand Down

0 comments on commit d975125

Please sign in to comment.