Skip to content

Commit

Permalink
quic: fixup application packet preparation
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jan 6, 2024
1 parent accec6d commit 4751763
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 71 deletions.
125 changes: 62 additions & 63 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,62 +236,64 @@ void Session::Application::StreamReset(Stream* stream,
}

void Session::Application::SendPendingData() {
static constexpr size_t kMaxPackets = 32;
Debug(session_, "Application sending pending data");
PathStorage path;
PathStorage prev_path;

Packet* packet = nullptr;
uint8_t* pos = nullptr;
uint8_t* begin = nullptr;
int err = 0;
size_t last_nwrite = 0;
StreamData stream_data;

// The maximum size of packet to create.
const size_t max_packet_size = session_->max_packet_size();

// The maximum number of packets to send in this call to SendPendingData.
const size_t max_packet_count = std::min(
static_cast<size_t>(64000),
kMaxPackets,
ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);

// The number of packets that have been sent in this call to SendPendingData.
size_t packet_send_count = 0;

Packet* packet = nullptr;
uint8_t* pos = nullptr;
uint8_t* begin = nullptr;

auto ensure_packet = [&] {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) return false;
pos = begin = ngtcp2_vec(*packet).base;
}
DCHECK_NOT_NULL(packet);
DCHECK_NOT_NULL(pos);
DCHECK_NOT_NULL(begin);
return true;
};

// We're going to enter a loop here to prepare and send no more than
// max_packet_count packets.
for (;;) {
// ndatalen is the amount of stream data that was accepted into the packet,
// if any.
// ndatalen is the amount of stream data that was accepted into the packet.
ssize_t ndatalen = 0;

// The stream_data is the next block of data from the application stream to
// send.
StreamData stream_data;
err = GetStreamData(&stream_data);
// Make sure we have a packet to write data into.
if (!ensure_packet()) {
Debug(session_, "Failed to create packet for stream data");
// Doh! Could not create a packet. Time to bail.
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}

if (err < 0) {
// The stream_data is the next block of data from the application stream.
if (GetStreamData(&stream_data) < 0) {
Debug(session_, "Application failed to get stream data");
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
Debug(session_, "Application failed to get stream data with error %s",
session_->last_error_);
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
}

// If we got here, we were at least successful in checking for stream data.
// There might not be any stream data to send.
Debug(session_, "Application using stream data: %s", stream_data);

// Now let's make sure we have a packet to write data into.
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
pos = begin = ngtcp2_vec(*packet).base;
}

DCHECK_NOT_NULL(pos);
DCHECK_NOT_NULL(begin);
DCHECK_NOT_NULL(stream_data.buf);

// Awesome, let's write our packet!
ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data);
Debug(session_, "Application accepted %zu bytes into packet", ndatalen);
Expand All @@ -304,7 +306,10 @@ void Session::Application::SendPendingData() {
// We could not write any data for this stream into the packet because
// the flow control for the stream itself indicates that the stream
// is blocked. We'll skip and move on to the next stream.
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
DCHECK(stream_data.stream);
session_->StreamDataBlocked(stream_data.id);
continue;
}
Expand All @@ -314,14 +319,16 @@ void Session::Application::SendPendingData() {
// any stream data!
Debug(session_, "Stream %" PRIi64 " should be closed for writing",
stream_data.id);
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
// If we got this response, then there should have been a stream associated
// with the stream_data, otherwise the response wouldn't make any sense.
DCHECK(stream_data.stream);
stream_data.stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
// This return value indicates that we should call into WriteVStream
// again to write more data into the same packet.
Debug(session_, "Application should write more to packet");
DCHECK_GE(ndatalen, 0);
if (!StreamCommit(&stream_data, ndatalen)) {
Expand All @@ -336,8 +343,8 @@ void Session::Application::SendPendingData() {
DCHECK_EQ(ndatalen, -1);
Debug(session_, "Application encountered error while writing packet: %s",
ngtcp2_strerror(nwrite));
packet->Done(UV_ECANCELED);
session_->SetLastError(QuicError::ForNgtcp2Error(nwrite));
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
} else if (ndatalen >= 0) {
// We wrote some data into the packet. We need to update the flow control
Expand All @@ -349,48 +356,41 @@ void Session::Application::SendPendingData() {
}

// When nwrite is zero, it means we are congestion limited.
// We should stop trying to send additional packets.
if (nwrite == 0) {
Debug(session_, "Congestion limited.");
// There might be a partial packet already prepared. If so, send it.
if (pos - begin) {
Debug(session_, "Packet has at least some data to send");
size_t datalen = pos - begin;
if (datalen) {
Debug(session_, "Packet has %zu bytes to send", datalen);
// At least some data had been written into the packet. We should send it.
packet->Truncate(pos - begin);
packet->Truncate(datalen);
session_->Send(packet, path);
} else {
packet->Done(UV_ECANCELED);
}

session_->UpdatePacketTxTime();
return;
// If there was stream data selected, we should reschedule it to try sending again.
if (stream_data.id >= 0) ResumeStream(stream_data.id);

return session_->UpdatePacketTxTime();
}

// At this point we have a packet prepared to send.
pos += nwrite;

if (packet_send_count == 0) {
path.CopyTo(&prev_path);
last_nwrite = nwrite;
} else if (prev_path != path ||
static_cast<size_t>(nwrite) > last_nwrite ||
(last_nwrite > max_packet_size && static_cast<size_t>(nwrite) != last_nwrite)) {
auto datalen = pos - begin - nwrite;
packet->Truncate(datalen);
session_->Send(packet->Clone(), prev_path);
session_->Send(packet, path);
session_->UpdatePacketTxTime();
packet = nullptr;
pos = nullptr;
begin = nullptr;
continue;
size_t datalen = pos - begin;
Debug(session_, "Sending packet with %zu bytes", datalen);
packet->Truncate(datalen);
session_->Send(packet, path);

// If we have sent the maximum number of packets, we're done.
if (++packet_send_count == max_packet_count) {
return session_->UpdatePacketTxTime();
}

if (++packet_send_count == max_packet_count || static_cast<size_t>(nwrite) < last_nwrite) {
auto datalen = pos - begin;
packet->Truncate(datalen);
session_->Send(packet, path);
session_->UpdatePacketTxTime();
return;
}
// Prepare to loop back around to prepare a new packet.
packet = nullptr;
pos = begin = nullptr;
}
}

Expand All @@ -403,7 +403,6 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
ngtcp2_pkt_info pi;
Debug(session_, "Writing max %zu bytes to packet", max_packet_size);
return ngtcp2_conn_writev_stream(
*session_,
&path->path,
Expand Down
15 changes: 7 additions & 8 deletions src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,27 +115,26 @@ class Session::Application : public MemoryRetainer {
// the default stream priority.
virtual StreamPriority GetStreamPriority(const Stream& stream);

protected:
inline Environment* env() const { return session_->env(); }
inline Session& session() { return *session_; }
inline const Session& session() const { return *session_; }

Packet* CreateStreamDataPacket();

struct StreamData;

virtual int GetStreamData(StreamData* data) = 0;
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
virtual bool ShouldSetFin(const StreamData& data) = 0;

inline Environment* env() const { return session_->env(); }
inline Session& session() { return *session_; }
inline const Session& session() const { return *session_; }

private:
Packet* CreateStreamDataPacket();

// Write the given stream_data into the buffer.
ssize_t WriteVStream(PathStorage* path,
uint8_t* buf,
ssize_t* ndatalen,
size_t max_packet_size,
const StreamData& stream_data);

private:
Session* session_;
};

Expand Down
1 change: 1 addition & 0 deletions src/quic/http3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class Http3Application final : public Session::Application {
}
}
}
DCHECK_NOT_NULL(stream_data.buf);
return 0;
}

Expand Down

0 comments on commit 4751763

Please sign in to comment.