Skip to content

Commit

Permalink
Merge pull request #193 from project-tsurugi/wip/reset
Browse files Browse the repository at this point in the history
Wip/reset
  • Loading branch information
t-horikawa authored Feb 23, 2024
2 parents fca47f4 + e049601 commit c0c27e5
Show file tree
Hide file tree
Showing 31 changed files with 155 additions and 638 deletions.
59 changes: 0 additions & 59 deletions include/tateyama/session/core.h

This file was deleted.

40 changes: 0 additions & 40 deletions src/tateyama/endpoint/common/response.h

This file was deleted.

33 changes: 25 additions & 8 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

#include <future>
#include <thread>
#include <memory>
#include <functional>
#include <map>
#include <vector>
#include <mutex>

#include <tateyama/status.h>
#include <tateyama/api/server/request.h>
Expand All @@ -31,13 +28,12 @@
#include <tateyama/status/resource/bridge.h>
#include <tateyama/logging_helper.h>
#include <tateyama/session/resource/bridge.h>
#include <tateyama/session/variable_set.h>
#include <tateyama/session/resource/variable_set.h>

#include <tateyama/proto/endpoint/request.pb.h>
#include <tateyama/proto/endpoint/response.pb.h>
#include <tateyama/proto/diagnostics.pb.h>

#include "response.h"
#include "tateyama/endpoint/common/session_info_impl.h"

namespace tateyama::endpoint::common {
Expand All @@ -61,12 +57,19 @@ class worker_common {
stream,
};

worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, [[maybe_unused]] const std::shared_ptr<tateyama::session::resource::bridge>& session)
worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, std::shared_ptr<tateyama::session::resource::bridge> session)
: connection_type_(con),
session_id_(session_id),
session_info_(session_id, connection_label(con), conn_info) {
session_info_(session_id, connection_label(con), conn_info),
session_(std::move(session)),
session_variable_set_(variable_declarations()),
session_context_(std::make_shared<tateyama::session::resource::session_context>(session_info_, session_variable_set_))
{
if (session_) {
session_->register_session(session_context_);
}
}
worker_common(connection_type con, std::size_t id, const std::shared_ptr<tateyama::session::resource::bridge>& session) : worker_common(con, id, "", session) {
worker_common(connection_type con, std::size_t id, std::shared_ptr<tateyama::session::resource::bridge> session) : worker_common(con, id, "", std::move(session)) {
}
void invoke(std::function<void(void)> func) {
task_ = std::packaged_task<void()>(std::move(func));
Expand All @@ -81,6 +84,7 @@ class worker_common {
const connection_type connection_type_; // NOLINT
const std::size_t session_id_; // NOLINT
session_info_impl session_info_; // NOLINT

// for ipc endpoint only
std::string connection_info_{}; // NOLINT
// for stream endpoint only
Expand All @@ -91,6 +95,9 @@ class worker_common {
std::future<void> future_; // NOLINT
std::thread thread_{}; // NOLINT

// for session management
const std::shared_ptr<tateyama::session::resource::bridge> session_; // NOLINT

bool handshake(tateyama::api::server::request* req, tateyama::api::server::response* res) {
if (req->service_id() != tateyama::framework::service_id_endpoint_broker) {
LOG(INFO) << "request received is not handshake";
Expand Down Expand Up @@ -171,6 +178,9 @@ class worker_common {
}

private:
tateyama::session::resource::session_variable_set session_variable_set_;
const std::shared_ptr<tateyama::session::resource::session_context> session_context_;

std::string_view connection_label(connection_type con) {
switch (con) {
case connection_type::ipc:
Expand All @@ -181,6 +191,13 @@ class worker_common {
return "";
}
}

[[nodiscard]] std::vector<std::tuple<std::string, tateyama::session::resource::session_variable_set::variable_type, tateyama::session::resource::session_variable_set::value_type>> variable_declarations() const noexcept {
return {
{ "example_integer", tateyama::session::resource::session_variable_type::signed_integer, static_cast<std::int64_t>(0) }
};
}

};

} // tateyama::endpoint::common
1 change: 1 addition & 0 deletions src/tateyama/endpoint/ipc/ipc_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ipc_response : public tateyama::api::server::response {
[[nodiscard]] bool check_cancel() const override {
return false;
}

private:
std::shared_ptr<server_wire_container> server_wire_;
std::size_t index_;
Expand Down
11 changes: 9 additions & 2 deletions src/tateyama/endpoint/loopback/loopback_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#include <map>
#include <mutex>

#include <tateyama/endpoint/common/response.h>
#include <tateyama/api/server/response.h>

#include "loopback_data_writer.h"
#include "loopback_data_channel.h"

namespace tateyama::endpoint::loopback {

class loopback_response: public tateyama::endpoint::common::response {
class loopback_response: public tateyama::api::server::response {
public:
/**
* @see tateyama::server::response::session_id()
Expand Down Expand Up @@ -84,6 +84,13 @@ class loopback_response: public tateyama::endpoint::common::response {
*/
tateyama::status release_channel(tateyama::api::server::data_channel &ch) override;

/**
* @see tateyama::server::response::check_cancel()
*/
[[nodiscard]] bool check_cancel() const override {
return false;
}

// just for unit test
[[nodiscard]] std::map<std::string, std::vector<std::string>, std::less<>> const& all_committed_data() const noexcept {
return committed_data_map_;
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/stream_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

namespace tateyama::endpoint::stream {

// class stream_response
class stream_request;

// class stream_response
Expand Down Expand Up @@ -126,6 +125,7 @@ tateyama::status stream_response::release_channel(tateyama::api::server::data_ch
return tateyama::status::unknown;
}


// class stream_data_channel
tateyama::status stream_data_channel::acquire(std::shared_ptr<tateyama::api::server::writer>& wrt) {
try {
Expand Down
2 changes: 2 additions & 0 deletions src/tateyama/endpoint/stream/stream_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <atomic>

#include <tateyama/api/server/response.h>

#include "tateyama/endpoint/common/pointer_comp.h"
#include "stream.h"

Expand Down Expand Up @@ -93,6 +94,7 @@ class stream_response : public tateyama::api::server::response {
[[nodiscard]] bool check_cancel() const override {
return false;
}

private:
std::shared_ptr<stream_socket> stream_;
std::uint16_t index_;
Expand Down
10 changes: 1 addition & 9 deletions src/tateyama/proto/endpoint/request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ message Request {
oneof command {
// handshake operation.
Handshake handshake = 11;

// cancel operation.
Cancel cancel = 12;
}
reserved 13 to 99;
reserved 12 to 99;
}

// handshake operation.
Expand Down Expand Up @@ -76,8 +73,3 @@ message WireInformation {
uint64 maximum_concurrent_result_sets = 1;
}
}

// cancel operation.
message Cancel {
// no special properties.
}
Loading

0 comments on commit c0c27e5

Please sign in to comment.