Skip to content

Commit

Permalink
Add some threading checking tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Oipo committed Nov 5, 2022
1 parent 529dcdd commit af105a2
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 23 deletions.
10 changes: 0 additions & 10 deletions examples/http_example/UsingHttpService.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class UsingHttpService final : public Service<UsingHttpService> {
private:
StartBehaviour start() final {
ICHOR_LOG_INFO(_logger, "UsingHttpService started");
_failureEventRegistration = getManager().registerEventHandler<FailedSendMessageEvent>(this);

getManager().pushEvent<RunFunctionEvent>(getServiceId(), [this](DependencyManager &dm) -> AsyncGenerator<void> {
auto toSendMsg = _serializationAdmin->serialize(TestMsg{11, "hello"});
Expand All @@ -40,8 +39,6 @@ class UsingHttpService final : public Service<UsingHttpService> {
}

StartBehaviour stop() final {
_dataEventRegistration.reset();
_failureEventRegistration.reset();
_routeRegistration.reset();
ICHOR_LOG_INFO(_logger, "UsingHttpService stopped");
return StartBehaviour::SUCCEEDED;
Expand Down Expand Up @@ -88,11 +85,6 @@ class UsingHttpService final : public Service<UsingHttpService> {
ICHOR_LOG_INFO(_logger, "Removed IHttpConnectionService");
}

AsyncGenerator<void> handleEvent(FailedSendMessageEvent const &evt) {
ICHOR_LOG_INFO(_logger, "Failed to send message id {}, retrying", evt.msgId);
return sendTestRequest(std::move(evt.data));
}

friend DependencyRegister;
friend DependencyManager;

Expand All @@ -114,7 +106,5 @@ class UsingHttpService final : public Service<UsingHttpService> {
ILogger *_logger{nullptr};
ISerializationAdmin *_serializationAdmin{nullptr};
IHttpConnectionService *_connectionService{nullptr};
EventHandlerRegistration _dataEventRegistration{};
EventHandlerRegistration _failureEventRegistration{};
std::unique_ptr<HttpRouteRegistration> _routeRegistration{nullptr};
};
11 changes: 0 additions & 11 deletions examples/http_ping_pong/PingService.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class PingService final : public Service<PingService> {
private:
StartBehaviour start() final {
ICHOR_LOG_INFO(_logger, "PingService started");
_failureEventRegistration = getManager().registerEventHandler<FailedSendMessageEvent>(this);

_timer = getManager().createServiceManager<Timer, ITimer>();
_timer->setCallback(this, [this](DependencyManager &dm) -> AsyncGenerator<void> {
Expand Down Expand Up @@ -55,8 +54,6 @@ class PingService final : public Service<PingService> {

StartBehaviour stop() final {
_timer->stopTimer();
_dataEventRegistration.reset();
_failureEventRegistration.reset();
ICHOR_LOG_INFO(_logger, "PingService stopped");
return StartBehaviour::SUCCEEDED;
}
Expand Down Expand Up @@ -88,12 +85,6 @@ class PingService final : public Service<PingService> {
ICHOR_LOG_INFO(_logger, "Removed IHttpConnectionService");
}

AsyncGenerator<void> handleEvent(FailedSendMessageEvent const &evt) {
++_failed;
ICHOR_LOG_INFO(_logger, "Failed to send message id {}, total failed {}", evt.msgId, _failed);
co_return;
}

friend DependencyRegister;
friend DependencyManager;

Expand All @@ -114,8 +105,6 @@ class PingService final : public Service<PingService> {
Timer *_timer{nullptr};
ISerializationAdmin *_serializationAdmin{nullptr};
IHttpConnectionService *_connectionService{nullptr};
EventHandlerRegistration _dataEventRegistration{};
EventHandlerRegistration _failureEventRegistration{};
uint64_t _sequence{};
uint64_t _failed{};
};
4 changes: 2 additions & 2 deletions include/ichor/services/network/http/IHttpConnectionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace Ichor {
public:

/**
* Send message asynchronously to the connected http server. An HttpResponseEvent is queued when the request finishes. In case of failure, pushes a FailedSendMessageEvent
* Send message asynchronously to the connected http server. Use co_await to get the response.
* @param method method type (GET, POST, etc)
* @param route The route, or path, of this request. Has to be pointing to valid memory until an HttpResponseEvent is received.
* @param msg Usually json, ignored for GET requests
* @return id of the request, will be available in the HttpResponseEvent
* @return response
*/
virtual AsyncGenerator<HttpResponse> sendAsync(HttpMethod method, std::string_view route, std::vector<HttpHeader> &&headers, std::vector<uint8_t>&& msg) = 0;

Expand Down
57 changes: 57 additions & 0 deletions test/DependencyManagerTests.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <ichor/event_queues/MultimapQueue.h>
#include <ichor/events/RunFunctionEvent.h>
#include <ichor/coroutines/AsyncManualResetEvent.h>
#include "TestServices/UselessService.h"
#include "Common.h"

Expand Down Expand Up @@ -64,4 +66,59 @@ TEST_CASE("DependencyManager") {

REQUIRE_FALSE(dm.isRunning());
}

SECTION("DependencyManager", "RunFunctionEvent thread") {
auto queue = std::make_unique<MultimapQueue>();
auto &dm = queue->createManager();
std::thread::id testThreadId = std::this_thread::get_id();
std::thread::id dmThreadId;

REQUIRE(Ichor::Detail::_local_dm == nullptr);

std::thread t([&]() {
dmThreadId = std::this_thread::get_id();
dm.createServiceManager<CoutFrameworkLogger, IFrameworkLogger>();
dm.createServiceManager<UselessService>();
queue->start(CaptureSigInt);
});

dm.runForOrQueueEmpty();

REQUIRE(dm.isRunning());

REQUIRE(Ichor::Detail::_local_dm == nullptr);

AsyncManualResetEvent evt;

dm.pushEvent<RunFunctionEvent>(0, [&](DependencyManager &_dm) -> AsyncGenerator<void> {
REQUIRE(Ichor::Detail::_local_dm == &_dm);
REQUIRE(Ichor::Detail::_local_dm == &dm);
REQUIRE(testThreadId != std::this_thread::get_id());
REQUIRE(dmThreadId == std::this_thread::get_id());
co_await evt;
REQUIRE(Ichor::Detail::_local_dm == &_dm);
REQUIRE(Ichor::Detail::_local_dm == &dm);
REQUIRE(testThreadId != std::this_thread::get_id());
REQUIRE(dmThreadId == std::this_thread::get_id());
dm.pushEvent<QuitEvent>(0);
co_return;
});

dm.runForOrQueueEmpty();

REQUIRE(Ichor::Detail::_local_dm == nullptr);

dm.pushEvent<RunFunctionEvent>(0, [&](DependencyManager &_dm) -> AsyncGenerator<void> {
REQUIRE(Ichor::Detail::_local_dm == &_dm);
REQUIRE(Ichor::Detail::_local_dm == &dm);
REQUIRE(testThreadId != std::this_thread::get_id());
REQUIRE(dmThreadId == std::this_thread::get_id());
evt.set();
co_return;
});

t.join();

REQUIRE_FALSE(dm.isRunning());
}
}
65 changes: 65 additions & 0 deletions test/HttpTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifdef ICHOR_USE_BOOST_BEAST

#include <ichor/event_queues/MultimapQueue.h>
#include <ichor/events/RunFunctionEvent.h>
#include <ichor/coroutines/AsyncManualResetEvent.h>
#include <ichor/services/logging/LoggerAdmin.h>
#include <ichor/services/network/http/HttpHostService.h>
#include <ichor/services/network/http/HttpConnectionService.h>
#include <ichor/services/network/http/HttpContextService.h>
#include <ichor/services/network/ClientAdmin.h>
#include <ichor/services/serialization/SerializationAdmin.h>
#include <ichor/services/logging/CoutFrameworkLogger.h>
#include <ichor/services/logging/CoutLogger.h>
#include "Common.h"
#include "TestServices/HttpThreadService.h"
#include "../examples/common/TestMsgJsonSerializer.h"

using namespace Ichor;

std::unique_ptr<Ichor::AsyncManualResetEvent> _evt;
std::thread::id testThreadId;
std::thread::id dmThreadId;
bool evtGate;

TEST_CASE("HttpTests") {
SECTION("Http events on same thread") {
testThreadId = std::this_thread::get_id();
_evt = std::make_unique<Ichor::AsyncManualResetEvent>();
auto queue = std::make_unique<MultimapQueue>();
auto &dm = queue->createManager();
evtGate = false;

std::thread t([&]() {
dmThreadId = std::this_thread::get_id();

dm.createServiceManager<CoutFrameworkLogger, IFrameworkLogger>({}, 10);
dm.createServiceManager<LoggerAdmin<CoutLogger>, ILoggerAdmin>();
dm.createServiceManager<SerializationAdmin, ISerializationAdmin>();
dm.createServiceManager<TestMsgJsonSerializer, ISerializer>();
dm.createServiceManager<HttpContextService, IHttpContextService>();
dm.createServiceManager<HttpHostService, IHttpService>(Properties{{"Address", Ichor::make_any<std::string>("127.0.0.1")}, {"Port", Ichor::make_any<uint16_t>(8001)}});
dm.createServiceManager<ClientAdmin<HttpConnectionService, IHttpConnectionService>, IClientAdmin>();
dm.createServiceManager<HttpThreadService>(Properties{{"Address", Ichor::make_any<std::string>("127.0.0.1")}, {"Port", Ichor::make_any<uint16_t>(8001)}});

queue->start(CaptureSigInt);
});

while(!evtGate) {
std::this_thread::sleep_for(1ms);
}

dm.pushEvent<RunFunctionEvent>(0, [&](DependencyManager &_dm) -> AsyncGenerator<void> {
REQUIRE(Ichor::Detail::_local_dm == &_dm);
REQUIRE(Ichor::Detail::_local_dm == &dm);
REQUIRE(testThreadId != std::this_thread::get_id());
REQUIRE(dmThreadId == std::this_thread::get_id());
_evt->set();
co_return;
});

t.join();
}
}

#endif
126 changes: 126 additions & 0 deletions test/TestServices/HttpThreadService.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#pragma once

#include <ichor/DependencyManager.h>
#include <ichor/services/logging/Logger.h>
#include <ichor/services/timer/TimerService.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/http/IHttpConnectionService.h>
#include <ichor/services/network/http/IHttpService.h>
#include <ichor/events/RunFunctionEvent.h>
#include <ichor/Service.h>
#include <ichor/LifecycleManager.h>
#include <ichor/services/serialization/ISerializationAdmin.h>
#include "../examples/common/TestMsg.h"

using namespace Ichor;

extern std::unique_ptr<Ichor::AsyncManualResetEvent> _evt;
extern bool evtGate;
extern std::thread::id testThreadId;
extern std::thread::id dmThreadId;

class HttpThreadService final : public Service<HttpThreadService> {
public:
HttpThreadService(DependencyRegister &reg, Properties props, DependencyManager *mng) : Service(std::move(props), mng) {
reg.registerDependency<ISerializationAdmin>(this, true);
reg.registerDependency<IHttpConnectionService>(this, true, getProperties());
reg.registerDependency<IHttpService>(this, true);
}
~HttpThreadService() final = default;

private:
StartBehaviour start() final {
getManager().pushEvent<RunFunctionEvent>(getServiceId(), [this](DependencyManager &dm) -> AsyncGenerator<void> {
auto toSendMsg = _serializationAdmin->serialize(TestMsg{11, "hello"});

if(dmThreadId != std::this_thread::get_id()) {
throw std::runtime_error("dmThreadId id incorrect");
}
if(testThreadId == std::this_thread::get_id()) {
throw std::runtime_error("testThreadId id incorrect");
}

co_await sendTestRequest(std::move(toSendMsg)).begin();

if(dmThreadId != std::this_thread::get_id()) {
throw std::runtime_error("dmThreadId id incorrect");
}
if(testThreadId == std::this_thread::get_id()) {
throw std::runtime_error("testThreadId id incorrect");
}

co_return;
});

return StartBehaviour::SUCCEEDED;
}

StartBehaviour stop() final {
_routeRegistration.reset();
return StartBehaviour::SUCCEEDED;
}

void addDependencyInstance(ISerializationAdmin *serializationAdmin, IService *) {
_serializationAdmin = serializationAdmin;
}

void removeDependencyInstance(ISerializationAdmin *serializationAdmin, IService *) {
_serializationAdmin = nullptr;
}

void addDependencyInstance(IHttpConnectionService *connectionService, IService *) {
_connectionService = connectionService;
}

void addDependencyInstance(IHttpService *svc, IService *) {
_routeRegistration = svc->addRoute(HttpMethod::post, "/test", [this](HttpRequest &req) -> AsyncGenerator<HttpResponse> {
if(dmThreadId != std::this_thread::get_id()) {
throw std::runtime_error("dmThreadId id incorrect");
}
if(testThreadId == std::this_thread::get_id()) {
throw std::runtime_error("testThreadId id incorrect");
}

auto msg = _serializationAdmin->deserialize<TestMsg>(std::move(req.body));
evtGate = true;

co_await *_evt;

if(dmThreadId != std::this_thread::get_id()) {
throw std::runtime_error("dmThreadId id incorrect");
}
if(testThreadId == std::this_thread::get_id()) {
throw std::runtime_error("testThreadId id incorrect");
}

co_return HttpResponse{false, HttpStatus::ok, _serializationAdmin->serialize(TestMsg{11, "hello"}), {}};
});
}

void removeDependencyInstance(IHttpService *, IService *) {
_routeRegistration.reset();
}

void removeDependencyInstance(IHttpConnectionService *connectionService, IService *) {
}

friend DependencyRegister;
friend DependencyManager;

AsyncGenerator<void> sendTestRequest(std::vector<uint8_t> &&toSendMsg) {
auto &response = *co_await _connectionService->sendAsync(HttpMethod::post, "/test", {}, std::move(toSendMsg)).begin();

if(response.status == HttpStatus::ok) {
auto msg = _serializationAdmin->deserialize<TestMsg>(response.body);
} else {
throw std::runtime_error("Status not ok");
}
getManager().pushEvent<QuitEvent>(getServiceId());

co_return;
}

ISerializationAdmin *_serializationAdmin{nullptr};
IHttpConnectionService *_connectionService{nullptr};
std::unique_ptr<HttpRouteRegistration> _routeRegistration{nullptr};
};

0 comments on commit af105a2

Please sign in to comment.