Skip to content

Commit

Permalink
rpcdaemon: move HTTP chunk handling in http::Connection (#1811)
Browse files Browse the repository at this point in the history
  • Loading branch information
lupin012 authored Feb 9, 2024
1 parent ce96fde commit 850e3e7
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 176 deletions.
1 change: 0 additions & 1 deletion silkworm/rpc/http/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class Channel : public StreamWriter {
Channel(const Channel&) = delete;
Channel& operator=(const Channel&) = delete;

virtual Task<void> open_stream() = 0;
virtual Task<void> write_rsp(const std::string& content) = 0;
};

Expand Down
17 changes: 16 additions & 1 deletion silkworm/rpc/http/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <string_view>

#include <absl/strings/str_join.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/http/chunk_encode.hpp>
#include <boost/beast/http/write.hpp>
#include <jwt-cpp/jwt.h>
#include <jwt-cpp/traits/nlohmann-json/defaults.h>
Expand Down Expand Up @@ -154,12 +156,25 @@ Task<void> Connection::open_stream() {
}
co_return;
}
Task<void> Connection::close_stream() {
try {
co_await boost::asio::async_write(socket_, boost::beast::http::make_chunk_last(), boost::asio::use_awaitable);
} catch (const boost::system::system_error& se) {
SILK_TRACE << "Connection::close system_error: " << se.what();
throw;
} catch (const std::exception& e) {
SILK_ERROR << "Connection::close exception: " << e.what();
throw;
}
co_return;
}

//! Write chunked response content to the underlying socket
Task<std::size_t> Connection::write(std::string_view content) {
unsigned long bytes_transferred{0};
try {
bytes_transferred = co_await boost::asio::async_write(socket_, boost::asio::buffer(content), boost::asio::use_awaitable);
boost::asio::const_buffer buffer{content.data(), content.size()};
bytes_transferred = co_await boost::asio::async_write(socket_, boost::beast::http::chunk_body(buffer), boost::asio::use_awaitable);
} catch (const boost::system::system_error& se) {
SILK_TRACE << "Connection::write system_error: " << se.what();
throw;
Expand Down
2 changes: 1 addition & 1 deletion silkworm/rpc/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Connection : public Channel {
Task<void> read_loop();

Task<void> open_stream() override;
Task<void> close() override { co_return; }
Task<void> close_stream() override;

Task<std::size_t> write(std::string_view content) override;
Task<void> write_rsp(const std::string& content) override;
Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/http/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ Task<void> RequestHandler::handle_request(commands::RpcApiTable::HandleStream ha
auto io_executor = co_await boost::asio::this_coro::executor;

try {
co_await channel_->open_stream();
ChunkWriter chunk_writer(*channel_);
json::Stream stream(io_executor, chunk_writer);
json::Stream stream(io_executor, *channel_);
co_await stream.open();

try {
co_await (rpc_api_.*handler)(request_json, stream);
Expand Down
6 changes: 5 additions & 1 deletion silkworm/rpc/json/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std
buffer_.reserve(buffer_capacity_ + buffer_capacity_ / 4);
}

Task<void> Stream::open() {
co_await writer_.open_stream();
}

Task<void> Stream::close() {
if (!buffer_.empty()) {
co_await do_async_write(std::make_shared<std::string>(std::move(buffer_)));
Expand All @@ -86,7 +90,7 @@ Task<void> Stream::close() {
co_await run_completion_channel_.async_receive(boost::asio::use_awaitable);
#endif // _WIN32

co_await writer_.close();
co_await writer_.close_stream();
}

void Stream::open_object() {
Expand Down
2 changes: 2 additions & 0 deletions silkworm/rpc/json/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Stream {
Stream(const Stream& stream) = delete;
Stream& operator=(const Stream&) = delete;

Task<void> open();

//! Flush any remaining data and close properly as per the underlying transport
Task<void> close();

Expand Down
9 changes: 4 additions & 5 deletions silkworm/rpc/json/stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
boost::asio::any_io_executor io_executor = io_context_.get_executor();

StringWriter string_writer;
ChunkWriter chunk_writer(string_writer);

SECTION("write_json in string") {
Stream stream(io_executor, string_writer);
Expand All @@ -53,7 +52,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
CHECK(string_writer.get_content() == "{\"test\":\"test\"}");
}
SECTION("write_json in 1 chunk") {
Stream stream(io_executor, chunk_writer);
Stream stream(io_executor, string_writer);

nlohmann::json json = R"({
"test": "test"
Expand All @@ -62,10 +61,10 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
stream.write_json(json);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "f\r\n{\"test\":\"test\"}\r\n0\r\n\r\n");
CHECK(string_writer.get_content() == "{\"test\":\"test\"}");
}
SECTION("write_json in 2 chunks") {
Stream stream(io_executor, chunk_writer);
Stream stream(io_executor, string_writer);

nlohmann::json json = R"({
"check": "check",
Expand All @@ -75,7 +74,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
stream.write_json(json);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "1f\r\n{\"check\":\"check\",\"test\":\"test\"}\r\n0\r\n\r\n");
CHECK(string_writer.get_content() == "{\"check\":\"check\",\"test\":\"test\"}");
}
}

Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/test/api_test_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ void populate_blocks(db::RWTxn& txn, const std::filesystem::path& tests_dir, InM
class ChannelForTest : public Channel {
public:
Task<void> open_stream() override { co_return; }
Task<void> close_stream() override { co_return; }
Task<std::size_t> write(std::string_view /* content */) override { co_return 0; }
Task<void> write_rsp(const std::string& response) override {
response_ = response;
co_return;
}
Task<std::size_t> write(std::string_view /* content */) override { co_return 0; }
Task<void> close() override { co_return; }

const std::string& response() { return response_; }

private:
Expand Down
56 changes: 0 additions & 56 deletions silkworm/rpc/types/writer.cpp

This file was deleted.

23 changes: 6 additions & 17 deletions silkworm/rpc/types/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class StreamWriter {
public:
virtual ~StreamWriter() = default;

virtual Task<void> open_stream() = 0;
virtual Task<void> close_stream() = 0;
virtual Task<std::size_t> write(std::string_view content) = 0;
virtual Task<void> close() = 0;
};

class StringWriter : public StreamWriter {
Expand All @@ -43,13 +44,15 @@ class StringWriter : public StreamWriter {
content_.reserve(initial_capacity);
}

Task<void> open_stream() override { co_return; }

Task<void> close_stream() override { co_return; }

Task<std::size_t> write(std::string_view content) override {
content_.append(content);
co_return content.size();
}

Task<void> close() override { co_return; }

const std::string& get_content() {
return content_;
}
Expand All @@ -58,18 +61,4 @@ class StringWriter : public StreamWriter {
std::string content_;
};

const std::string kChunkSep{'\r', '\n'}; // NOLINT(runtime/string)
const std::string kFinalChunk{'0', '\r', '\n', '\r', '\n'}; // NOLINT(runtime/string)

class ChunkWriter : public StreamWriter {
public:
explicit ChunkWriter(StreamWriter& writer);

Task<std::size_t> write(std::string_view content) override;
Task<void> close() override;

private:
StreamWriter& writer_;
};

} // namespace silkworm::rpc
Loading

0 comments on commit 850e3e7

Please sign in to comment.