Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcdaemon: move HTTP chunk handling in http::Connection #1811

Merged
merged 6 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion silkworm/rpc/http/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class Channel : public StreamWriter {
Channel(const Channel&) = delete;
Channel& operator=(const Channel&) = delete;

virtual Task<void> open_stream() = 0;
virtual Task<void> write_rsp(const std::string& content) = 0;
};

Expand Down
17 changes: 16 additions & 1 deletion silkworm/rpc/http/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <string_view>

#include <absl/strings/str_join.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/http/chunk_encode.hpp>
#include <boost/beast/http/write.hpp>
#include <jwt-cpp/jwt.h>
#include <jwt-cpp/traits/nlohmann-json/defaults.h>
Expand Down Expand Up @@ -154,12 +156,25 @@ Task<void> Connection::open_stream() {
}
co_return;
}
Task<void> Connection::close_stream() {
try {
co_await boost::asio::async_write(socket_, boost::beast::http::make_chunk_last(), boost::asio::use_awaitable);
} catch (const boost::system::system_error& se) {
SILK_TRACE << "Connection::close system_error: " << se.what();
throw;
} catch (const std::exception& e) {
SILK_ERROR << "Connection::close exception: " << e.what();
throw;
}
co_return;
}

//! Write chunked response content to the underlying socket
Task<std::size_t> Connection::write(std::string_view content) {
unsigned long bytes_transferred{0};
try {
bytes_transferred = co_await boost::asio::async_write(socket_, boost::asio::buffer(content), boost::asio::use_awaitable);
boost::asio::const_buffer buffer{content.data(), content.size()};
bytes_transferred = co_await boost::asio::async_write(socket_, boost::beast::http::chunk_body(buffer), boost::asio::use_awaitable);
} catch (const boost::system::system_error& se) {
SILK_TRACE << "Connection::write system_error: " << se.what();
throw;
Expand Down
2 changes: 1 addition & 1 deletion silkworm/rpc/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Connection : public Channel {
Task<void> read_loop();

Task<void> open_stream() override;
Task<void> close() override { co_return; }
Task<void> close_stream() override;

Task<std::size_t> write(std::string_view content) override;
Task<void> write_rsp(const std::string& content) override;
Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/http/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ Task<void> RequestHandler::handle_request(commands::RpcApiTable::HandleStream ha
auto io_executor = co_await boost::asio::this_coro::executor;

try {
co_await channel_->open_stream();
ChunkWriter chunk_writer(*channel_);
json::Stream stream(io_executor, chunk_writer);
json::Stream stream(io_executor, *channel_);
co_await stream.open();

try {
co_await (rpc_api_.*handler)(request_json, stream);
Expand Down
6 changes: 5 additions & 1 deletion silkworm/rpc/json/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std
buffer_.reserve(buffer_capacity_ + buffer_capacity_ / 4);
}

Task<void> Stream::open() {
co_await writer_.open_stream();
}

Task<void> Stream::close() {
if (!buffer_.empty()) {
co_await do_async_write(std::make_shared<std::string>(std::move(buffer_)));
Expand All @@ -86,7 +90,7 @@ Task<void> Stream::close() {
co_await run_completion_channel_.async_receive(boost::asio::use_awaitable);
#endif // _WIN32

co_await writer_.close();
co_await writer_.close_stream();
}

void Stream::open_object() {
Expand Down
2 changes: 2 additions & 0 deletions silkworm/rpc/json/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Stream {
Stream(const Stream& stream) = delete;
Stream& operator=(const Stream&) = delete;

Task<void> open();

//! Flush any remaining data and close properly as per the underlying transport
Task<void> close();

Expand Down
9 changes: 4 additions & 5 deletions silkworm/rpc/json/stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
boost::asio::any_io_executor io_executor = io_context_.get_executor();

StringWriter string_writer;
ChunkWriter chunk_writer(string_writer);

SECTION("write_json in string") {
Stream stream(io_executor, string_writer);
Expand All @@ -53,7 +52,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
CHECK(string_writer.get_content() == "{\"test\":\"test\"}");
}
SECTION("write_json in 1 chunk") {
Stream stream(io_executor, chunk_writer);
Stream stream(io_executor, string_writer);

nlohmann::json json = R"({
"test": "test"
Expand All @@ -62,10 +61,10 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
stream.write_json(json);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "f\r\n{\"test\":\"test\"}\r\n0\r\n\r\n");
CHECK(string_writer.get_content() == "{\"test\":\"test\"}");
}
SECTION("write_json in 2 chunks") {
Stream stream(io_executor, chunk_writer);
Stream stream(io_executor, string_writer);

nlohmann::json json = R"({
"check": "check",
Expand All @@ -75,7 +74,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
stream.write_json(json);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "1f\r\n{\"check\":\"check\",\"test\":\"test\"}\r\n0\r\n\r\n");
CHECK(string_writer.get_content() == "{\"check\":\"check\",\"test\":\"test\"}");
}
}

Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/test/api_test_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ void populate_blocks(db::RWTxn& txn, const std::filesystem::path& tests_dir, InM
class ChannelForTest : public Channel {
public:
Task<void> open_stream() override { co_return; }
Task<void> close_stream() override { co_return; }
Task<std::size_t> write(std::string_view /* content */) override { co_return 0; }
Task<void> write_rsp(const std::string& response) override {
response_ = response;
co_return;
}
Task<std::size_t> write(std::string_view /* content */) override { co_return 0; }
Task<void> close() override { co_return; }

const std::string& response() { return response_; }

private:
Expand Down
56 changes: 0 additions & 56 deletions silkworm/rpc/types/writer.cpp

This file was deleted.

23 changes: 6 additions & 17 deletions silkworm/rpc/types/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class StreamWriter {
public:
virtual ~StreamWriter() = default;

virtual Task<void> open_stream() = 0;
virtual Task<void> close_stream() = 0;
virtual Task<std::size_t> write(std::string_view content) = 0;
virtual Task<void> close() = 0;
};

class StringWriter : public StreamWriter {
Expand All @@ -43,13 +44,15 @@ class StringWriter : public StreamWriter {
content_.reserve(initial_capacity);
}

Task<void> open_stream() override { co_return; }

Task<void> close_stream() override { co_return; }

Task<std::size_t> write(std::string_view content) override {
content_.append(content);
co_return content.size();
}

Task<void> close() override { co_return; }

const std::string& get_content() {
return content_;
}
Expand All @@ -58,18 +61,4 @@ class StringWriter : public StreamWriter {
std::string content_;
};

const std::string kChunkSep{'\r', '\n'}; // NOLINT(runtime/string)
const std::string kFinalChunk{'0', '\r', '\n', '\r', '\n'}; // NOLINT(runtime/string)

class ChunkWriter : public StreamWriter {
public:
explicit ChunkWriter(StreamWriter& writer);

Task<std::size_t> write(std::string_view content) override;
Task<void> close() override;

private:
StreamWriter& writer_;
};

} // namespace silkworm::rpc
Loading
Loading