Skip to content

Commit 6dddae1

Browse files
committed
improved & fixed ThriftBroker
* renamed swc.ThriftBroker.handlers to *.clients.handlers * fixed process_results CompactResults& * added cfg swc.ThriftBroker.connections.max * added default swc.ThriftBroker.cfg.dyn=swc_thriftbroker.dyn.cfg * added thrift::transport::TSocket to AppHandler ctor & storage * added LOG_INFO Open & Close Connection + total m_connections * added SIGINT app-context at server stopped-serving * fixed ThreadManager stop , 1st stop TThreadPoolServer
1 parent b801231 commit 6dddae1

File tree

8 files changed

+160
-63
lines changed

8 files changed

+160
-63
lines changed

docs/configure/properties/thriftbroker.md

+9-4
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,27 @@ _default_ **```swc.ThriftBroker.port=18000```**
2323
The number of workers a comm-reactor initalizes. \
2424
_default_ **```swc.ThriftBroker.workers=32```**
2525

26-
* ### swc.ThriftBroker.handlers
26+
* ### swc.ThriftBroker.connections.max
2727
```INT32```
28-
The number of Application handlers. \
29-
_default_ **```swc.ThriftBroker.handlers=8```**
28+
The Max client Connections allowed, any new connections above the Open-Connections will be dropped and the Max-Total is the number of Endpoints(Thrift-Broker is listening-on) by `swc.ThriftBroker.connections.max`. If open-file-descriptors is above allowed-limit Thrift-Broker will shutdown, unplanned shutdown can be avoided by the max limit. \
29+
_default_ **```swc.ThriftBroker.workers=INT64_MAX```**
3030

3131
* ### swc.ThriftBroker.transport
3232
```STRING```
3333
The thrift transport that should be used (framed/). \
3434
_default_ **```swc.ThriftBroker.transport=framed```**
3535

36-
3736
* ### swc.ThriftBroker.timeout
3837
```INT32```
3938
The ThriftBroker timeout in milliseconds. \
4039
_default_ **```swc.ThriftBroker.timeout=900000```**
4140

41+
* ### swc.ThriftBroker.clients.handlers
42+
```INT32```
43+
The number of SWC-DB clients handlers. \
44+
_default_ **```swc.ThriftBroker.clients.handlers=8```**
45+
46+
4247

4348
***
4449

src/cc/bin/swcdb/thriftbroker/main.cc

+33-8
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ int run() {
3535

3636
uint32_t reactors = 1; // settings->get_i32("swc.ThriftBroker.reactors");
3737
int workers = settings->get_i32("swc.ThriftBroker.workers");
38+
uint64_t conns_max = settings->get_i64("swc.ThriftBroker.connections.max");
3839
uint32_t timeout_ms = settings->get_i16("swc.ThriftBroker.timeout");
3940
std::string transport = settings->get_str("swc.ThriftBroker.transport");
4041

@@ -77,6 +78,7 @@ int run() {
7778

7879

7980
auto app_ctx = std::make_shared<AppContext>();
81+
std::vector<std::unique_ptr<std::thread>> threads;
8082
std::vector<std::shared_ptr<thrift::server::TThreadPoolServer>> servers;
8183

8284
for(uint32_t reactor=0; reactor < reactors; ++reactor) {
@@ -114,22 +116,45 @@ int run() {
114116
protocol,
115117
threadManager
116118
);
119+
server->setConcurrentClientLimit(conns_max);
120+
117121
servers.push_back(server);
118-
std::thread([server]{ server->serve(); }).detach();
119-
120-
SWC_LOG_OUT(LOG_INFO,
121-
SWC_LOG_OSTREAM << "Listening On: " << endpoint
122-
<< " fd=" << (ssize_t)server->getServerTransport()->getSocketFD()
123-
<< ' ' << (is_plain ? "PLAIN" : "SECURE");
122+
threads.emplace_back(
123+
new std::thread([app_ctx, is_plain, endpoint, server] {
124+
SWC_LOG_OUT(LOG_INFO, SWC_LOG_OSTREAM
125+
<< "Listening On: " << endpoint
126+
<< " fd=" << (ssize_t)server->getServerTransport()->getSocketFD()
127+
<< ' ' << (is_plain ? "PLAIN" : "SECURE");
128+
);
129+
130+
server->serve();
131+
132+
SWC_LOG_OUT(LOG_INFO, SWC_LOG_OSTREAM
133+
<< "Stopping to Listen On: " << endpoint
134+
<< " fd=" << (ssize_t)server->getServerTransport()->getSocketFD()
135+
<< ' ' << (is_plain ? "PLAIN" : "SECURE");
136+
);
137+
app_ctx->shutting_down(std::error_code(), SIGINT);
138+
})
124139
);
125140
}
126141
}
127142

128143
app_ctx->wait_while_run();
129144

130-
quick_exit(0);
131-
for(auto& server : servers)
145+
for(auto& server : servers) {
146+
server->stop();
132147
server->getThreadManager()->stop();
148+
}
149+
servers.clear();
150+
151+
for(auto& th : threads)
152+
th->join();
153+
threads.clear();
154+
155+
SWC_LOG(LOG_INFO, "Exit");
156+
//quick_exit(0);
157+
133158

134159
return 0);
135160

src/cc/include/swcdb/thrift/broker/AppContext.h

+46-18
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class AppContext final : virtual public BrokerIfFactory {
2828

2929
AppContext() : m_run(true) {
3030
auto settings = Env::Config::settings();
31-
Env::IoCtx::init(settings->get_i32("swc.ThriftBroker.handlers"));
31+
Env::IoCtx::init(settings->get_i32("swc.ThriftBroker.clients.handlers"));
3232

3333
int sig = 0;
3434
Env::IoCtx::io()->set_signals();
@@ -55,62 +55,90 @@ class AppContext final : virtual public BrokerIfFactory {
5555
}
5656

5757
virtual ~AppContext() { }
58-
58+
5959
void wait_while_run() {
6060
std::unique_lock lock_wait(m_mutex);
6161
m_cv.wait(lock_wait, [this]{return !m_run;});
6262
}
6363

64-
BrokerIf* getHandler(const thrift::TConnectionInfo&) override { //connInfo
65-
return new AppHandler;
64+
BrokerIf* getHandler(const thrift::TConnectionInfo& connInfo) override {
65+
AppHandler* handler = new AppHandler(
66+
std::dynamic_pointer_cast<thrift::transport::TSocket>(
67+
connInfo.transport));
68+
if(handler->socket) try {
69+
SWC_LOG_OUT(LOG_INFO,
70+
SWC_LOG_OSTREAM << "Connection Opened(hdlr=" << size_t(handler)
71+
<< " [" << handler->socket->getPeerAddress() << "]:"
72+
<< handler->socket->getPeerPort()
73+
<< ") open=" << m_connections.increment_and_count();
74+
);
75+
} catch(...) {
76+
SWC_LOG_CURRENT_EXCEPTION("");
77+
}
78+
return handler;
6679
}
6780

6881
void releaseHandler(ServiceIf* hdlr) override {
6982
AppHandler* handler = dynamic_cast<AppHandler*>(hdlr);
7083
handler->disconnected();
71-
72-
delete hdlr;
73-
}
74-
75-
private:
7684

77-
78-
void shutting_down(const std::error_code &ec, const int &sig) {
85+
if(handler->socket) try {
86+
SWC_LOG_OUT(LOG_INFO,
87+
SWC_LOG_OSTREAM << "Connection Closed(hdlr=" << size_t(handler)
88+
<< " [" << handler->socket->getPeerAddress() << "]:"
89+
<< handler->socket->getPeerPort()
90+
<< ") open=" << m_connections.decrement_and_count();
91+
);
92+
} catch(...) {
93+
SWC_LOG_CURRENT_EXCEPTION("");
94+
}
95+
delete handler;
96+
}
7997

80-
if(sig==0){ // set signals listener
98+
void shutting_down(const std::error_code& ec, const int& sig) {
99+
if(sig == 0) { // set signals listener
81100
Env::IoCtx::io()->signals()->async_wait(
82-
[ptr=this](const std::error_code &ec, const int &sig){
101+
[this](const std::error_code& ec, const int &sig) {
102+
if(ec == asio::error::operation_aborted)
103+
return;
83104
SWC_LOGF(LOG_INFO, "Received signal, sig=%d ec=%s", sig, ec.message().c_str());
84-
ptr->shutting_down(ec, sig);
105+
shutting_down(ec, sig);
85106
}
86107
);
87108
SWC_LOGF(LOG_INFO, "Listening for Shutdown signal, set at sig=%d ec=%s",
88109
sig, ec.message().c_str());
89110
return;
111+
} else {
112+
113+
std::scoped_lock lock(m_mutex);
114+
if(!m_run)
115+
return;
116+
m_run = false;
90117
}
118+
91119
SWC_LOGF(LOG_INFO, "Shutdown signal, sig=%d ec=%s", sig, ec.message().c_str());
92120

93121
stop();
94122
}
95123

124+
private:
125+
96126
void stop() {
97127

98128
Env::Clients::get()->rgr->stop();
99129
Env::Clients::get()->mngr->stop();
130+
Env::IoCtx::io()->stop();
100131

101132
//Env::FsInterface::interface()->stop();
102-
Env::IoCtx::io()->stop();
103133

104-
SWC_LOG(LOG_INFO, "Exit");
105-
106134
std::scoped_lock lock(m_mutex);
107-
m_run = false;
108135
m_cv.notify_all();
109136
}
110137

111138
std::mutex m_mutex;
112139
bool m_run;
113140
std::condition_variable m_cv;
141+
Common::Stats::CompletionCounter<size_t> m_connections;
114142
};
115143

116144

src/cc/include/swcdb/thrift/broker/AppHandler.h

+20-13
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
#define swcdb_app_thriftbroker_AppHandler_h
88

99

10-
#include "swcdb/thrift/gen-cpp/Broker.h"
10+
#include "swcdb/core/Semaphore.h"
1111
#include "swcdb/db/client/sql/SQL.h"
12+
#include "swcdb/thrift/gen-cpp/Broker.h"
1213
#include "swcdb/thrift/utils/Converter.h"
1314

15+
#include <thrift/transport/TSocket.h>
16+
1417

1518
namespace SWC {
1619
namespace thrift = apache::thrift;
@@ -25,6 +28,12 @@ using namespace Thrift;
2528
class AppHandler final : virtual public BrokerIf {
2629
public:
2730

31+
const std::shared_ptr<thrift::transport::TSocket> socket;
32+
33+
AppHandler(const std::shared_ptr<thrift::transport::TSocket>& socket)
34+
: socket(socket) {
35+
}
36+
2837
virtual ~AppHandler() { }
2938

3039
/* SQL any */
@@ -600,25 +609,23 @@ class AppHandler final : virtual public BrokerIf {
600609
Converter::exception(err);
601610
}
602611

603-
std::mutex mutex;
604-
std::promise<void> res;
605-
for(auto& schema : dbschemas) {
612+
size_t sz = dbschemas.size();
613+
Core::Semaphore sem(sz, sz);
614+
_return.resize(sz);
615+
for(size_t idx = 0; idx < sz; ++idx) {
616+
auto& r = _return[idx];
606617
Comm::Protocol::Mngr::Req::ColumnCompact::request(
607-
schema->cid,
608-
[&mutex, &_return, await=&res, cid=schema->cid, sz=dbschemas.size()]
618+
(r.cid = dbschemas[idx]->cid),
619+
[&sem, err=&r.err]
609620
(const Comm::client::ConnQueue::ReqBase::Ptr&,
610621
const Comm::Protocol::Mngr::Params::ColumnCompactRsp& rsp) {
611-
std::scoped_lock lock(mutex);
612-
auto& r = _return.emplace_back();
613-
r.cid=cid;
614-
r.err=rsp.err;
615-
if(_return.size() == sz)
616-
await->set_value();
622+
*err = rsp.err;
623+
sem.release();
617624
},
618625
300000
619626
);
620627
}
621-
res.get_future().wait();
628+
sem.wait_all();
622629
}
623630

624631
static void process_results(

src/cc/include/swcdb/thrift/broker/Settings.h

+16-7
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,25 @@ void Settings::init_app_options() {
1919
init_client_options();
2020

2121
file_desc.add_options()
22-
("swc.ThriftBroker.cfg", str(), "Specific cfg-file for ThriftBroker")
22+
("swc.ThriftBroker.cfg", str(),
23+
"Specific cfg-file for ThriftBroker")
2324
("swc.ThriftBroker.cfg.dyn", strs(),
2425
"Specific dyn. cfg-file for ThriftBroker")
2526

26-
("swc.ThriftBroker.port", i16(18000), "ThriftBroker port")
27-
("swc.ThriftBroker.transport", str("framed"), "ThriftBroker timeout")
28-
29-
("swc.ThriftBroker.workers", i32(32), "Number of Comm-Workers")
30-
("swc.ThriftBroker.timeout", i32(900000), "ThriftBroker timeout")
31-
("swc.ThriftBroker.handlers", i32(8), "Number of App Handlers")
27+
("swc.ThriftBroker.port", i16(18000),
28+
"ThriftBroker port")
29+
("swc.ThriftBroker.transport", str("framed"),
30+
"ThriftBroker timeout")
31+
32+
("swc.ThriftBroker.workers", i32(32),
33+
"Number of Comm-Workers")
34+
("swc.ThriftBroker.connections.max", i64(INT64_MAX),
35+
"The Max client Connections allowed , total= addrs X this")
36+
37+
("swc.ThriftBroker.timeout", i32(900000),
38+
"ThriftBroker timeout")
39+
("swc.ThriftBroker.clients.handlers", i32(8),
40+
"The number of SWC-DB clients handlers")
3241
;
3342
}
3443

src/etc/swcdb/swc_thriftbroker.cfg

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111

1212

13-
# swc.ThriftBroker.cfg.dyn=swc_thriftbroker.dyn.cfg
13+
swc.ThriftBroker.cfg.dyn=swc_thriftbroker.dyn.cfg
1414

1515
# swc.ThriftBroker.port=18000 # default ThriftBroker port
1616
swc.ThriftBroker.transport=framed # /zlib
1717

1818
swc.ThriftBroker.workers=32
19+
swc.ThriftBroker.connections.max=1000
1920
swc.ThriftBroker.timeout=900000
20-
swc.ThriftBroker.handlers=8
21+
swc.ThriftBroker.clients.handlers=8
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# SWC-DB© Copyright since 2019 Alex Kashirin <[email protected]>
2+
# License details at <https://github.com/kashirin-alex/swc-db/#license>
3+
4+
5+
6+
# Dynamic Configurations
7+
## swcdbThriftBroker read the file
8+
# documentations at
9+
# https://www.swcdb.org/configure/properties/thriftbroker.html
10+
##
11+
12+
13+
14+
swc.logging.level=info

tests/integration/thrift/client.cc

+19-11
Original file line numberDiff line numberDiff line change
@@ -204,17 +204,25 @@ void sql_list_columns_all(Client& client) {
204204
void sql_compact_columns(Client& client) {
205205
std::cout << std::endl << "test: sql_compact_columns all: " << std::endl;
206206
CompactResults res;
207-
client.sql_compact_columns(
208-
res,
209-
"compact columns "
210-
);
211-
assert(res.size() >= 2);
212-
std::cout << std::endl << "CompactResults.size=" << res.size() << std::endl;
213-
for(auto& r : res) {
214-
assert(!r.err);
215-
r.printTo(std::cout << " ");
216-
std::cout << std::endl;
217-
}
207+
208+
bool err = false;
209+
size_t count = 0;
210+
do {
211+
res.clear();
212+
client.sql_compact_columns(res, "compact columns ");
213+
assert(res.size() >= 2);
214+
std::cout << std::endl
215+
<< "CompactResults.size=" << res.size()
216+
<< " waited=" << count << "ms" << std::endl;
217+
for(auto& r : res) {
218+
r.printTo(std::cout << " ");
219+
std::cout << std::endl;
220+
if((err = r.err))
221+
break;
222+
}
223+
std::this_thread::sleep_for(std::chrono::microseconds(1000));
224+
++count;
225+
} while(err);
218226

219227
std::cout << std::endl << "test: sql_compact_columns test-columns: " << std::endl;
220228
res.clear();

0 commit comments

Comments
 (0)