Skip to content

Commit

Permalink
Add CordInputStream and CordOutputStream providing stream support dir…
Browse files Browse the repository at this point in the history
…ectly from/to Cord data

PiperOrigin-RevId: 493150162
  • Loading branch information
martijnvels authored and copybara-github committed Dec 6, 2022
1 parent 474152d commit 8afd1b6
Show file tree
Hide file tree
Showing 4 changed files with 914 additions and 0 deletions.
213 changes: 213 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_impl_lite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,219 @@ bool LimitingInputStream::ReadCord(absl::Cord* cord, int count) {


// ===================================================================
CordInputStream::CordInputStream(const absl::Cord* cord)
: it_(cord->char_begin()),
length_(cord->size()),
bytes_remaining_(length_) {
LoadChunkData();
}

bool CordInputStream::LoadChunkData() {
if (bytes_remaining_ != 0) {
absl::string_view sv = absl::Cord::ChunkRemaining(it_);
data_ = sv.data();
size_ = available_ = sv.size();
return true;
}
size_ = available_ = 0;
return false;
}

bool CordInputStream::NextChunk(size_t skip) {
// `size_ == 0` indicates we're at EOF.
if (size_ == 0) return false;

// The caller consumed 'size_ - available_' bytes that are not yet accounted
// for in the iterator position to get to the start of the next chunk.
const size_t distance = size_ - available_ + skip;
absl::Cord::Advance(&it_, distance);
bytes_remaining_ -= skip;

return LoadChunkData();
}

bool CordInputStream::Next(const void** data, int* size) {
if (available_ > 0 || NextChunk(0)) {
*data = data_ + size_ - available_;
*size = available_;
bytes_remaining_ -= available_;
available_ = 0;
return true;
}
return false;
}

void CordInputStream::BackUp(int count) {
// Backup is only allowed on last returned chunk from `Next()`.
GOOGLE_CHECK_LE(static_cast<size_t>(count), size_ - available_);

available_ += count;
bytes_remaining_ += count;
}

bool CordInputStream::Skip(int count) {
// Short circuit if we stay inside the current chunk.
if (static_cast<size_t>(count) <= available_) {
available_ -= count;
bytes_remaining_ -= count;
return true;
}

// Sanity check the skip count.
if (static_cast<size_t>(count) <= bytes_remaining_) {
// Skip to end: do not return EOF condition: skipping into EOF is ok.
NextChunk(count);
return true;
}
NextChunk(bytes_remaining_);
return false;
}

int64_t CordInputStream::ByteCount() const {
return length_ - bytes_remaining_;
}

bool CordInputStream::ReadCord(absl::Cord* cord, int count) {
// Advance the iterator to the current position
const size_t used = size_ - available_;
absl::Cord::Advance(&it_, used);

// Read the cord, adjusting the iterator position.
// Make sure to cap at available bytes to avoid hard crashes.
const size_t n = std::min(static_cast<size_t>(count), bytes_remaining_);
cord->Append(absl::Cord::AdvanceAndRead(&it_, n));

// Update current chunk data.
bytes_remaining_ -= n;
LoadChunkData();

return n == static_cast<size_t>(count);
}


CordOutputStream::CordOutputStream(size_t size_hint) : size_hint_(size_hint) {}

CordOutputStream::CordOutputStream(absl::Cord cord, size_t size_hint)
: cord_(std::move(cord)),
size_hint_(size_hint),
state_(cord_.empty() ? State::kEmpty : State::kSteal) {}

CordOutputStream::CordOutputStream(absl::CordBuffer buffer, size_t size_hint)
: size_hint_(size_hint),
state_(buffer.length() < buffer.capacity() ? State::kPartial
: State::kFull),
buffer_(std::move(buffer)) {}

CordOutputStream::CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
size_t size_hint)
: cord_(std::move(cord)),
size_hint_(size_hint),
state_(buffer.length() < buffer.capacity() ? State::kPartial
: State::kFull),
buffer_(std::move(buffer)) {}

bool CordOutputStream::Next(void** data, int* size) {
// Use 128 bytes as a minimum buffer size if we don't have any application
// provided size hints. This number is picked somewhat arbitrary as 'small
// enough to avoid excessive waste on small data, and large enough to not
// waste CPU and memory on tiny buffer overhead'.
// It is worth noting that absent size hints, we pick 'current size' as
// the default buffer size (capped at max flat size), which means we quickly
// double the buffer size. This is in contrast to `Cord::Append()` functions
// accepting strings which use a conservative 10% growth.
static const size_t kMinBlockSize = 128;

size_t desired_size, max_size;
const size_t cord_size = cord_.size() + buffer_.length();
if (size_hint_ > cord_size) {
// Try to hit size_hint_ exactly so the caller doesn't receive a larger
// buffer than indicated, requiring a non-zero call to BackUp() to undo
// the buffer capacity we returned beyond the indicated size hint.
desired_size = size_hint_ - cord_size;
max_size = desired_size;
} else {
// We're past the size hint or don't have a size hint. Try to allocate a
// block as large as what we have so far, or at least kMinBlockSize bytes.
// CordBuffer will truncate this to an appropriate size if it is too large.
desired_size = std::max(cord_size, kMinBlockSize);
max_size = std::numeric_limits<size_t>::max();
}

switch (state_) {
case State::kSteal:
// Steal last buffer from Cord if available.
assert(buffer_.length() == 0);
buffer_ = cord_.GetAppendBuffer(desired_size);
break;
case State::kPartial:
// Use existing capacity in 'buffer_`
assert(buffer_.length() < buffer_.capacity());
break;
case State::kFull:
assert(buffer_.length() > 0);
cord_.Append(std::move(buffer_));
PROTOBUF_FALLTHROUGH_INTENDED;
case State::kEmpty:
assert(buffer_.length() == 0);
buffer_ = absl::CordBuffer::CreateWithDefaultLimit(desired_size);
break;
}

// Get all available capacity from the buffer.
absl::Span<char> span = buffer_.available();
assert(!span.empty());
*data = span.data();

// Only hand out up to 'max_size', which is limited if there is a size hint
// specified, and we have more available than the size hint.
if (span.size() > max_size) {
*size = static_cast<int>(max_size);
buffer_.IncreaseLengthBy(max_size);
state_ = State::kPartial;
} else {
*size = static_cast<int>(span.size());
buffer_.IncreaseLengthBy(span.size());
state_ = State::kFull;
}

return true;
}

void CordOutputStream::BackUp(int count) {
// Check if something to do, else state remains unchanged.
assert(0 <= count && count <= ByteCount());
if (count == 0) return;

// Backup() is not supposed to backup beyond last Next() call
const int buffer_length = static_cast<int>(buffer_.length());
assert(count <= buffer_length);
if (count <= buffer_length) {
buffer_.SetLength(static_cast<size_t>(buffer_length - count));
state_ = State::kPartial;
} else {
buffer_ = {};
cord_.RemoveSuffix(static_cast<size_t>(count));
state_ = State::kSteal;
}
}

int64_t CordOutputStream::ByteCount() const {
return static_cast<int64_t>(cord_.size() + buffer_.length());
}

bool CordOutputStream::WriteCord(const absl::Cord& cord) {
cord_.Append(std::move(buffer_));
cord_.Append(cord);
state_ = State::kSteal; // Attempt to utilize existing capacity in `cord'
return true;
}

absl::Cord CordOutputStream::Consume() {
cord_.Append(std::move(buffer_));
state_ = State::kEmpty;
return std::move(cord_);
}


} // namespace io
} // namespace protobuf
Expand Down
128 changes: 128 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_impl_lite.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,134 @@ class PROTOBUF_EXPORT LimitingInputStream PROTOBUF_FUTURE_FINAL
int64_t prior_bytes_read_; // Bytes read on underlying stream at construction
};

// ===================================================================

// A ZeroCopyInputStream backed by a Cord. This stream implements ReadCord()
// in a way that can share memory between the source and destination cords
// rather than copying.
class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream {
public:
// Creates an InputStream that reads from the given Cord. `cord` must
// not be null and must outlive this CordInputStream instance. `cord` must
// not be modified while this instance is actively being used: any change
// to `cord` will lead to undefined behavior on any subsequent call into
// this instance.
explicit CordInputStream(
const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND);


// `CordInputStream` is neither copiable nor assignable
CordInputStream(const CordInputStream&) = delete;
CordInputStream& operator=(const CordInputStream&) = delete;

// implements ZeroCopyInputStream ----------------------------------
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
int64_t ByteCount() const override;
bool ReadCord(absl::Cord* cord, int count) override;


private:
// Moves `it_` to the next available chunk skipping `skip` extra bytes
// and updates the chunk data pointers.
bool NextChunk(size_t skip);

// Updates the current chunk data context `data_`, `size_` and `available_`.
// If `bytes_remaining_` is zero, sets `size_` and `available_` to zero.
// Returns true if more data is available, false otherwise.
bool LoadChunkData();

absl::Cord::CharIterator it_;
size_t length_;
size_t bytes_remaining_;
const char* data_;
size_t size_;
size_t available_;
};

// ===================================================================

// A ZeroCopyOutputStream that writes to a Cord. This stream implements
// WriteCord() in a way that can share memory between the source and
// destination cords rather than copying.
class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream {
public:
// Creates an OutputStream streaming serialized data into a Cord. `size_hint`,
// if given, is the expected total size of the resulting Cord. This is a hint
// only, used for optimization. Callers can obtain the generated Cord value by
// invoking `Consume()`.
explicit CordOutputStream(size_t size_hint = 0);

// Creates an OutputStream with an initial Cord value. This constructor can be
// used by applications wanting to directly append serialization data to a
// given cord. In such cases, donating the existing value as in:
//
// CordOutputStream stream(std::move(cord));
// message.SerializeToZeroCopyStream(&stream);
// cord = std::move(stream.Consume());
//
// is more efficient then appending the serialized cord in application code:
//
// CordOutputStream stream;
// message.SerializeToZeroCopyStream(&stream);
// cord.Append(stream.Consume());
//
// The former allows `CordOutputStream` to utilize pre-existing privately
// owned Cord buffers from the donated cord where the latter does not, which
// may lead to more memory usage when serialuzing data into existing cords.
explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0);

// Creates an OutputStream with an initial Cord value and initial buffer.
// This donates both the preexisting cord in `cord`, as well as any
// pre-existing data and additional capacity in `buffer`.
// This function is mainly intended to be used in internal serialization logic
// using eager buffer initialization in EpsCopyOutputStream.
// The donated buffer can be empty, partially empty or full: the outputstream
// will DTRT in all cases and preserve any pre-existing data.
explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
size_t size_hint = 0);

// Creates an OutputStream with an initial buffer.
// This method is logically identical to, but more efficient than:
// `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)`
explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0);

// `CordOutputStream` is neither copiable nor assignable
CordOutputStream(const CordOutputStream&) = delete;
CordOutputStream& operator=(const CordOutputStream&) = delete;

// implements `ZeroCopyOutputStream` ---------------------------------
bool Next(void** data, int* size) final;
void BackUp(int count) final;
int64_t ByteCount() const final;
bool WriteCord(const absl::Cord& cord) final;

// Consumes the serialized data as a cord value. `Consume()` internally
// flushes any pending state 'as if' BackUp(0) was called. While a final call
// to BackUp() is generally required by the `ZeroCopyOutputStream` contract,
// applications using `CordOutputStream` directly can call `Consume()` without
// a preceding call to `BackUp()`.
//
// While it will rarely be useful in practice (and especially in the presence
// of size hints) an instance is safe to be used after a call to `Consume()`.
// The only logical change in state is that all serialized data is extracted,
// and any new serialization calls will serialize into new cord data.
absl::Cord Consume();

private:
// State of `buffer_` and 'cord_. As a default CordBuffer instance always has
// inlined capacity, we track state explicitly to avoid returning 'existing
// capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates
// we should (attempt to) steal the next buffer from the cord.
enum class State { kEmpty, kFull, kPartial, kSteal };

absl::Cord cord_;
size_t size_hint_;
State state_ = State::kEmpty;
absl::CordBuffer buffer_;
};


// ===================================================================

Expand Down
Loading

0 comments on commit 8afd1b6

Please sign in to comment.