From 850e3e7c1bcfa7be7cff8055354ab95adf2b1965 Mon Sep 17 00:00:00 2001 From: lupin012 <58134934+lupin012@users.noreply.github.com> Date: Fri, 9 Feb 2024 23:00:57 +0100 Subject: [PATCH] rpcdaemon: move HTTP chunk handling in http::Connection (#1811) --- silkworm/rpc/http/channel.hpp | 1 - silkworm/rpc/http/connection.cpp | 17 +++- silkworm/rpc/http/connection.hpp | 2 +- silkworm/rpc/http/request_handler.cpp | 5 +- silkworm/rpc/json/stream.cpp | 6 +- silkworm/rpc/json/stream.hpp | 2 + silkworm/rpc/json/stream_test.cpp | 9 +-- silkworm/rpc/test/api_test_database.hpp | 5 +- silkworm/rpc/types/writer.cpp | 56 ------------- silkworm/rpc/types/writer.hpp | 23 ++---- silkworm/rpc/types/writer_test.cpp | 101 ++++-------------------- silkworm/rpc/ws/connection.cpp | 6 +- silkworm/rpc/ws/connection.hpp | 2 +- 13 files changed, 59 insertions(+), 176 deletions(-) delete mode 100644 silkworm/rpc/types/writer.cpp diff --git a/silkworm/rpc/http/channel.hpp b/silkworm/rpc/http/channel.hpp index ef59861039..a22a3190ec 100644 --- a/silkworm/rpc/http/channel.hpp +++ b/silkworm/rpc/http/channel.hpp @@ -28,7 +28,6 @@ class Channel : public StreamWriter { Channel(const Channel&) = delete; Channel& operator=(const Channel&) = delete; - virtual Task open_stream() = 0; virtual Task write_rsp(const std::string& content) = 0; }; diff --git a/silkworm/rpc/http/connection.cpp b/silkworm/rpc/http/connection.cpp index 5e9463bcb0..03a90021f7 100644 --- a/silkworm/rpc/http/connection.cpp +++ b/silkworm/rpc/http/connection.cpp @@ -20,9 +20,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -154,12 +156,25 @@ Task Connection::open_stream() { } co_return; } +Task Connection::close_stream() { + try { + co_await boost::asio::async_write(socket_, boost::beast::http::make_chunk_last(), boost::asio::use_awaitable); + } catch (const boost::system::system_error& se) { + SILK_TRACE << "Connection::close system_error: " << se.what(); + throw; + } catch (const std::exception& e) { + SILK_ERROR << "Connection::close exception: " << e.what(); + throw; + } + co_return; +} //! Write chunked response content to the underlying socket Task Connection::write(std::string_view content) { unsigned long bytes_transferred{0}; try { - bytes_transferred = co_await boost::asio::async_write(socket_, boost::asio::buffer(content), boost::asio::use_awaitable); + boost::asio::const_buffer buffer{content.data(), content.size()}; + bytes_transferred = co_await boost::asio::async_write(socket_, boost::beast::http::chunk_body(buffer), boost::asio::use_awaitable); } catch (const boost::system::system_error& se) { SILK_TRACE << "Connection::write system_error: " << se.what(); throw; diff --git a/silkworm/rpc/http/connection.hpp b/silkworm/rpc/http/connection.hpp index 8c10d5dd77..69e37bdba0 100644 --- a/silkworm/rpc/http/connection.hpp +++ b/silkworm/rpc/http/connection.hpp @@ -60,7 +60,7 @@ class Connection : public Channel { Task read_loop(); Task open_stream() override; - Task close() override { co_return; } + Task close_stream() override; Task write(std::string_view content) override; Task write_rsp(const std::string& content) override; diff --git a/silkworm/rpc/http/request_handler.cpp b/silkworm/rpc/http/request_handler.cpp index 947059c954..f6b670a8ac 100644 --- a/silkworm/rpc/http/request_handler.cpp +++ b/silkworm/rpc/http/request_handler.cpp @@ -174,9 +174,8 @@ Task RequestHandler::handle_request(commands::RpcApiTable::HandleStream ha auto io_executor = co_await boost::asio::this_coro::executor; try { - co_await channel_->open_stream(); - ChunkWriter chunk_writer(*channel_); - json::Stream stream(io_executor, chunk_writer); + json::Stream stream(io_executor, *channel_); + co_await stream.open(); try { co_await (rpc_api_.*handler)(request_json, stream); diff --git a/silkworm/rpc/json/stream.cpp b/silkworm/rpc/json/stream.cpp index 09001e8af9..36d877b75e 100644 --- a/silkworm/rpc/json/stream.cpp +++ b/silkworm/rpc/json/stream.cpp @@ -73,6 +73,10 @@ Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std buffer_.reserve(buffer_capacity_ + buffer_capacity_ / 4); } +Task Stream::open() { + co_await writer_.open_stream(); +} + Task Stream::close() { if (!buffer_.empty()) { co_await do_async_write(std::make_shared(std::move(buffer_))); @@ -86,7 +90,7 @@ Task Stream::close() { co_await run_completion_channel_.async_receive(boost::asio::use_awaitable); #endif // _WIN32 - co_await writer_.close(); + co_await writer_.close_stream(); } void Stream::open_object() { diff --git a/silkworm/rpc/json/stream.hpp b/silkworm/rpc/json/stream.hpp index f176a6a170..2ab4f182df 100644 --- a/silkworm/rpc/json/stream.hpp +++ b/silkworm/rpc/json/stream.hpp @@ -43,6 +43,8 @@ class Stream { Stream(const Stream& stream) = delete; Stream& operator=(const Stream&) = delete; + Task open(); + //! Flush any remaining data and close properly as per the underlying transport Task close(); diff --git a/silkworm/rpc/json/stream_test.cpp b/silkworm/rpc/json/stream_test.cpp index bd639ffd94..2784a3ed84 100644 --- a/silkworm/rpc/json/stream_test.cpp +++ b/silkworm/rpc/json/stream_test.cpp @@ -38,7 +38,6 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") { boost::asio::any_io_executor io_executor = io_context_.get_executor(); StringWriter string_writer; - ChunkWriter chunk_writer(string_writer); SECTION("write_json in string") { Stream stream(io_executor, string_writer); @@ -53,7 +52,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") { CHECK(string_writer.get_content() == "{\"test\":\"test\"}"); } SECTION("write_json in 1 chunk") { - Stream stream(io_executor, chunk_writer); + Stream stream(io_executor, string_writer); nlohmann::json json = R"({ "test": "test" @@ -62,10 +61,10 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") { stream.write_json(json); spawn_and_wait(stream.close()); - CHECK(string_writer.get_content() == "f\r\n{\"test\":\"test\"}\r\n0\r\n\r\n"); + CHECK(string_writer.get_content() == "{\"test\":\"test\"}"); } SECTION("write_json in 2 chunks") { - Stream stream(io_executor, chunk_writer); + Stream stream(io_executor, string_writer); nlohmann::json json = R"({ "check": "check", @@ -75,7 +74,7 @@ TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") { stream.write_json(json); spawn_and_wait(stream.close()); - CHECK(string_writer.get_content() == "1f\r\n{\"check\":\"check\",\"test\":\"test\"}\r\n0\r\n\r\n"); + CHECK(string_writer.get_content() == "{\"check\":\"check\",\"test\":\"test\"}"); } } diff --git a/silkworm/rpc/test/api_test_database.hpp b/silkworm/rpc/test/api_test_database.hpp index 0837b9a208..4f2bb83af1 100644 --- a/silkworm/rpc/test/api_test_database.hpp +++ b/silkworm/rpc/test/api_test_database.hpp @@ -55,13 +55,12 @@ void populate_blocks(db::RWTxn& txn, const std::filesystem::path& tests_dir, InM class ChannelForTest : public Channel { public: Task open_stream() override { co_return; } + Task close_stream() override { co_return; } + Task write(std::string_view /* content */) override { co_return 0; } Task write_rsp(const std::string& response) override { response_ = response; co_return; } - Task write(std::string_view /* content */) override { co_return 0; } - Task close() override { co_return; } - const std::string& response() { return response_; } private: diff --git a/silkworm/rpc/types/writer.cpp b/silkworm/rpc/types/writer.cpp deleted file mode 100644 index 07ab88dab5..0000000000 --- a/silkworm/rpc/types/writer.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright 2023 The Silkworm Authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#include "writer.hpp" - -#include -#include - -#include - -namespace silkworm::rpc { - -ChunkWriter::ChunkWriter(StreamWriter& writer) : writer_(writer) {} - -Task ChunkWriter::write(std::string_view content) { - auto size = content.size(); - std::array str{}; - - std::size_t written{0}; - if (auto [ptr, ec] = std::to_chars(str.data(), str.data() + str.size(), size, 16); ec == std::errc()) { - auto view = std::string_view(str.data(), ptr); - - std::string chunk(view.size() + 2 * kChunkSep.size() + content.size(), '\0'); - chunk = view; - chunk += kChunkSep; - chunk += content; - chunk += kChunkSep; - written = co_await writer_.write(chunk); - } else { - SILK_ERROR << "Invalid conversion for size " << size; - } - - co_return written; -} - -Task ChunkWriter::close() { - co_await writer_.write(kFinalChunk); - co_await writer_.close(); - - co_return; -} - -} // namespace silkworm::rpc diff --git a/silkworm/rpc/types/writer.hpp b/silkworm/rpc/types/writer.hpp index 69741460da..7858d2a054 100644 --- a/silkworm/rpc/types/writer.hpp +++ b/silkworm/rpc/types/writer.hpp @@ -31,8 +31,9 @@ class StreamWriter { public: virtual ~StreamWriter() = default; + virtual Task open_stream() = 0; + virtual Task close_stream() = 0; virtual Task write(std::string_view content) = 0; - virtual Task close() = 0; }; class StringWriter : public StreamWriter { @@ -43,13 +44,15 @@ class StringWriter : public StreamWriter { content_.reserve(initial_capacity); } + Task open_stream() override { co_return; } + + Task close_stream() override { co_return; } + Task write(std::string_view content) override { content_.append(content); co_return content.size(); } - Task close() override { co_return; } - const std::string& get_content() { return content_; } @@ -58,18 +61,4 @@ class StringWriter : public StreamWriter { std::string content_; }; -const std::string kChunkSep{'\r', '\n'}; // NOLINT(runtime/string) -const std::string kFinalChunk{'0', '\r', '\n', '\r', '\n'}; // NOLINT(runtime/string) - -class ChunkWriter : public StreamWriter { - public: - explicit ChunkWriter(StreamWriter& writer); - - Task write(std::string_view content) override; - Task close() override; - - private: - StreamWriter& writer_; -}; - } // namespace silkworm::rpc diff --git a/silkworm/rpc/types/writer_test.cpp b/silkworm/rpc/types/writer_test.cpp index c49f76cde6..049ad6b372 100644 --- a/silkworm/rpc/types/writer_test.cpp +++ b/silkworm/rpc/types/writer_test.cpp @@ -32,8 +32,9 @@ class JsonChunkWriter : public StreamWriter { public: explicit JsonChunkWriter(StreamWriter& writer, std::size_t chunk_size = kDefaultChunkSize); + Task open_stream() override { co_return; } + Task close_stream() override; Task write(std::string_view content) override; - Task close() override; private: static const std::size_t kDefaultChunkSize = 0x800; @@ -43,12 +44,10 @@ class JsonChunkWriter : public StreamWriter { const std::size_t chunk_size_; size_t room_left_in_chunk_; std::size_t written_{0}; - std::stringstream str_chunk_size_; }; JsonChunkWriter::JsonChunkWriter(StreamWriter& writer, std::size_t chunk_size) : writer_(writer), chunk_size_(chunk_size), room_left_in_chunk_(chunk_size_) { - str_chunk_size_ << std::hex << chunk_size_ << kChunkSep; } Task JsonChunkWriter::write(std::string_view content) { @@ -57,7 +56,6 @@ Task JsonChunkWriter::write(std::string_view content) { SILK_DEBUG << "JsonChunkWriter::write written_: " << written_ << " size: " << size; if (!chunk_open_) { - co_await writer_.write(str_chunk_size_.str()); chunk_open_ = true; } @@ -75,12 +73,10 @@ Task JsonChunkWriter::write(std::string_view content) { if ((room_left_in_chunk_ % chunk_size_) == 0) { if (chunk_open_) { - co_await writer_.write(kChunkSep); room_left_in_chunk_ = chunk_size_; chunk_open_ = false; } if (remaining_in_view > 0) { - co_await writer_.write(str_chunk_size_.str()); chunk_open_ = true; } } @@ -88,20 +84,17 @@ Task JsonChunkWriter::write(std::string_view content) { co_return content.size(); } -Task JsonChunkWriter::close() { +Task JsonChunkWriter::close_stream() { if (chunk_open_) { if (room_left_in_chunk_ > 0) { std::unique_ptr buffer{new char[room_left_in_chunk_]}; std::memset(buffer.get(), ' ', room_left_in_chunk_); co_await writer_.write(std::string_view(buffer.get(), room_left_in_chunk_)); } - co_await writer_.write(kChunkSep); chunk_open_ = false; room_left_in_chunk_ = chunk_size_; } - co_await writer_.write(kFinalChunk); - co_return; } @@ -114,121 +107,61 @@ TEST_CASE_METHOD(WriterTest, "StringWriter") { CHECK(writer.get_content() == test); } - SECTION("close") { + SECTION("close_stream") { StringWriter writer(5); std::string test = "test"; spawn_and_wait(writer.write(test)); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); CHECK(writer.get_content() == test); } } -TEST_CASE_METHOD(WriterTest, "ChunkWriter") { - SECTION("write&close under chunk size") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.write("1234")); - spawn_and_wait(writer.close()); - - CHECK(s_writer.get_content() == "4\r\n1234\r\n0\r\n\r\n"); - } - SECTION("write over chunk size 4") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.write("1234")); - spawn_and_wait(writer.write("5678")); - - CHECK(s_writer.get_content() == "4\r\n1234\r\n4\r\n5678\r\n"); - } - SECTION("write&close over chunk size 4") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.write("1234")); - spawn_and_wait(writer.write("5678")); - spawn_and_wait(writer.write("90")); - spawn_and_wait(writer.close()); - - CHECK(s_writer.get_content() == "4\r\n1234\r\n4\r\n5678\r\n2\r\n90\r\n0\r\n\r\n"); - } - SECTION("write over chunk size 5") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.write("12345")); - spawn_and_wait(writer.write("67890")); - - CHECK(s_writer.get_content() == "5\r\n12345\r\n5\r\n67890\r\n"); - } - SECTION("write&close over chunk size 5") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.write("12345")); - spawn_and_wait(writer.write("67890")); - spawn_and_wait(writer.write("12")); - spawn_and_wait(writer.close()); - - CHECK(s_writer.get_content() == "5\r\n12345\r\n5\r\n67890\r\n2\r\n12\r\n0\r\n\r\n"); - } - SECTION("close") { - StringWriter s_writer; - ChunkWriter writer(s_writer); - - spawn_and_wait(writer.close()); - - CHECK(s_writer.get_content() == "0\r\n\r\n"); - } -} - TEST_CASE_METHOD(WriterTest, "JsonChunkWriter") { SECTION("write&close under chunk size") { StringWriter s_writer; JsonChunkWriter writer(s_writer, 16); spawn_and_wait(writer.write("1234")); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "10\r\n1234 \r\n0\r\n\r\n"); + CHECK(s_writer.get_content() == "1234 "); } SECTION("write&close over chunk size 4") { StringWriter s_writer; JsonChunkWriter writer(s_writer, 4); spawn_and_wait(writer.write("1234567890")); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "4\r\n1234\r\n4\r\n5678\r\n4\r\n90 \r\n0\r\n\r\n"); + CHECK(s_writer.get_content() == "1234567890 "); } SECTION("write&close over chunk size 5") { StringWriter s_writer; JsonChunkWriter writer(s_writer, 5); spawn_and_wait(writer.write("1234567890")); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "5\r\n12345\r\n5\r\n67890\r\n0\r\n\r\n"); + CHECK(s_writer.get_content() == "1234567890"); } SECTION("write&close over chunk size 5") { StringWriter s_writer; JsonChunkWriter writer(s_writer, 5); spawn_and_wait(writer.write("123456789012")); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "5\r\n12345\r\n5\r\n67890\r\n5\r\n12 \r\n0\r\n\r\n"); + CHECK(s_writer.get_content() == "123456789012 "); } SECTION("close") { StringWriter s_writer; JsonChunkWriter writer(s_writer); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "0\r\n\r\n"); + CHECK(s_writer.get_content() == ""); } SECTION("write json") { StringWriter s_writer; @@ -243,9 +176,9 @@ TEST_CASE_METHOD(WriterTest, "JsonChunkWriter") { const auto content = json.dump(/*indent=*/-1, /*indent_char=*/' ', /*ensure_ascii=*/false, nlohmann::json::error_handler_t::replace); spawn_and_wait(writer.write(content)); - spawn_and_wait(writer.close()); + spawn_and_wait(writer.close_stream()); - CHECK(s_writer.get_content() == "30\r\n{\"accounts\":{},\"next\":\"next\",\"root\":\"root\"} \r\n0\r\n\r\n"); + CHECK(s_writer.get_content() == "{\"accounts\":{},\"next\":\"next\",\"root\":\"root\"} "); } } diff --git a/silkworm/rpc/ws/connection.cpp b/silkworm/rpc/ws/connection.cpp index 8a4b80c77e..94999d3cee 100644 --- a/silkworm/rpc/ws/connection.cpp +++ b/silkworm/rpc/ws/connection.cpp @@ -43,7 +43,7 @@ Connection::~Connection() { } Task Connection::accept(const boost::beast::http::request& req) { - // Set suggested timeout settings for the websocket + // Set timeout settings for the websocket boost::beast::websocket::stream_base::timeout timeout{ .handshake_timeout = std::chrono::seconds(30), .idle_timeout = std::chrono::seconds(60), @@ -52,11 +52,11 @@ Task Connection::accept(const boost::beast::http::request open_stream() override { co_return; } - Task close() override { co_return; } + Task close_stream() override { co_return; } Task write_rsp(const std::string& content) override; Task write(std::string_view content) override;