-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathserver_manager_link_wrapper.h
132 lines (117 loc) · 4.43 KB
/
server_manager_link_wrapper.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* ===============================================================
* Description: Coordinator link wrapper for shards.
*
* Created: 2014-02-10 14:52:59
*
* Author: Robert Escriva, [email protected]
* Ayush Dubey, [email protected]
*
* Copyright (C) 2013, Cornell University, see the LICENSE file
* for licensing agreement
* ===============================================================
*/
// Most of the following code has been 'borrowed' from
// Robert Escriva's HyperDex coordinator.
// see https://github.com/rescrv/HyperDex for the original code.
#ifndef weaver_shard_server_manager_link_wrapper_h_
#define weaver_shard_server_manager_link_wrapper_h_
// C
#include <stdint.h>
#include <map>
#include <queue>
// po6
#include <po6/net/location.h>
#include <po6/threads/thread.h>
#include <po6/threads/cond.h>
#include <po6/threads/mutex.h>
// e
#include <e/intrusive_ptr.h>
// Weaver
#include "common/configuration.h"
#include "common/server_manager_link.h"
#include "common/ids.h"
// The thread whose calls the constructor can call everything. All other
// threads are left with the threadsafe block below.
class server_manager_link_wrapper
{
public:
server_manager_link_wrapper(server_id us, std::shared_ptr<po6::net::location> loc);
~server_manager_link_wrapper() throw ();
public:
void set_server_manager_address(const char* host, uint16_t port);
bool get_unique_number(uint64_t &id);
bool register_id(server_id us, const po6::net::location& bind_to, server::type_t type);
bool should_exit();
bool maintain_link();
const configuration& config();
void request_shutdown();
// threadsafe
public:
void config_ack(uint64_t version);
void config_stable(uint64_t version);
void report_tcp_disconnect(uint64_t id);
private:
class sm_rpc;
class sm_rpc_available;
class sm_rpc_config_ack;
class sm_rpc_config_stable;
typedef std::map<int64_t, e::intrusive_ptr<sm_rpc> > rpc_map_t;
private:
void background_maintenance();
void do_sleep();
void reset_sleep();
void enter_critical_section();
void exit_critical_section();
void enter_critical_section_killable();
void enter_critical_section_background();
void exit_critical_section_killable();
void ensure_available();
void ensure_config_ack();
void ensure_config_stable();
void make_rpc(const char* func,
const char* data, size_t data_sz,
e::intrusive_ptr<sm_rpc> rpc);
int64_t make_rpc_nosync(const char* func,
const char* data, size_t data_sz,
e::intrusive_ptr<sm_rpc> rpc);
int64_t make_rpc_defended(const char* enter_func,
const char* enter_data, size_t enter_data_sz,
const char* exit_func,
const char* exit_data, size_t exit_data_sz,
e::intrusive_ptr<sm_rpc> rpc);
int64_t wait_nosync(const char* cond, uint64_t state,
e::intrusive_ptr<sm_rpc> rpc);
private:
server_id m_us;
std::shared_ptr<po6::net::location> m_loc;
po6::threads::thread m_poller;
std::queue<std::pair<int64_t, replicant_returncode>> m_deferred;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
std::auto_ptr<server_manager_link> m_sm;
#pragma GCC diagnostic pop
rpc_map_t m_rpcs;
po6::threads::mutex m_mtx;
po6::threads::cond m_cond;
bool m_poller_started;
bool m_locked;
bool m_kill;
pthread_t m_to_kill;
uint64_t m_waiting;
uint64_t m_sleep;
int64_t m_online_id;
bool m_shutdown_requested;
// make sure we reliably ack
bool m_need_config_ack;
uint64_t m_config_ack;
int64_t m_config_ack_id;
// make sure we reliably stabilize
bool m_need_config_stable;
uint64_t m_config_stable;
int64_t m_config_stable_id;
private:
server_manager_link_wrapper(const server_manager_link_wrapper&);
server_manager_link_wrapper& operator = (const server_manager_link_wrapper&);
};
#endif // weaver_shard_server_manager_link_wrapper_h_