-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdisruptor.h
153 lines (119 loc) · 4.09 KB
/
disruptor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// A Disruptor is a "magic ring buffer" used in concurrent programming to
// achieve high throughput and low latency processing in a multi-producer,
// multi-consumer multicast setting.
//
// Each producer and consumer maintains a cursor into the ring buffer, which
// tells it whether a slot is available for reading/writing. Cursors can
// "follow" other cursors to prevent them from getting ahead/behind. For
// example, by making all producer cursors follow consumer cursors, we ensure
// that producers cannot write to the ring buffer if the slowest consumer has
// not consumed previously written values. Similarly, if we set consumer
// cursors to follow producers, then fast consumers can never overtake the
// slowest producer. When both producers and consumers follow each other, we
// effectively have a lock-free MPMC multicast queue, where all consumers can
// access data written onto the ring buffer (as opposed to a traditional MC
// unicast queue where a single consumer pops off the data).
//
// For more information on the design of the disruptor, see:
//
// https://www.baeldung.com/lmax-disruptor-concurrency
#pragma once
#include <array>
#include <atomic>
#include <iostream>
#include <boost/interprocess/offset_ptr.hpp>
#define SPIN_UNTIL(cond) \
while (!((cond))) { \
asm("pause"); \
}
#define CACHE_LINE_SZ 64
using boost::interprocess::offset_ptr;
// An atomic sequence number type.
//
// This type has an alignment requirement to prevent false sharing across cache
// lines.
class alignas(CACHE_LINE_SZ) Sequence {
public:
Sequence(int64_t v = 0) : value_{v} {}
int64_t acquire() const { return value_.load(std::memory_order_acquire); }
void store(int64_t value) { value_.store(value, std::memory_order_release); }
private:
std::atomic<int64_t> value_;
};
template <typename T, size_t size>
class RingBuffer {
static_assert((size != 0) && ((size & (~size + 1)) == size),
"disruptor size must be a power of two");
public:
const T& at(int64_t pos) const { return buffer_[pos & (size - 1)]; }
T& at(int64_t pos) { return buffer_[pos & (size - 1)]; }
private:
std::array<T, size> buffer_;
};
class EventCursor;
class SequenceBarrier {
public:
void follows(const EventCursor* e) {
for (auto& cursor : limit_seq_) {
if (!cursor) {
cursor = e;
return;
}
}
// TODO: die -- barrier is full
}
int64_t getMin();
int64_t waitFor(int64_t pos) const;
private:
static const int MAX_FOLLOWERS = 8;
std::array<offset_ptr<const EventCursor>, MAX_FOLLOWERS> limit_seq_{};
mutable int64_t last_min_ = -1;
};
class EventCursor {
public:
EventCursor(int64_t b = 0) : begin_{b}, end_{b} {}
void follows(const EventCursor* e) { barrier_.follows(e); }
int64_t begin() const { return begin_; }
int64_t end() const { return end_; }
void publish(int64_t p) {
begin_ = p + 1;
cursor_.store(p);
}
const Sequence& pos() const { return cursor_; }
protected:
int64_t begin_;
int64_t end_;
SequenceBarrier barrier_;
Sequence cursor_;
};
class ReadCursor : public EventCursor {
public:
ReadCursor(int64_t p = 0) : EventCursor(p) { cursor_.store(-1); }
int64_t waitFor(int64_t pos) {
std::cout << "read cursor requesting new end after " << pos << std::endl;
return end_ = barrier_.waitFor(pos) + 1;
}
int64_t checkEnd() { return end_ = barrier_.getMin() + 1; }
};
class WriteCursor : public EventCursor {
public:
WriteCursor(int64_t s) : size_{s} {
begin_ = 0;
end_ = size_;
cursor_.store(-1);
}
// Wait until available space in ring buffer is pos - cursor. This implies
// that all consumers must be at least to pos - size_ and that the new end
// should be minimum of reader positions + size_.
int64_t waitFor(int64_t pos) {
std::cout << "write cursor requesting new end after " << pos << std::endl;
auto min = barrier_.waitFor(pos - size_);
if (min == std::numeric_limits<int64_t>::max()) {
return end_ = pos + size_ + 1;
}
return end_ = min + size_ + 1;
}
int64_t checkEnd() { return end_ = barrier_.getMin() + size_; }
private:
int64_t size_;
};