-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHWQueue.cpp
128 lines (102 loc) · 2.94 KB
/
HWQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/**
* C++ implementation of queue from the Herlihy & Wing paper
*
* Outputs data in format that can be parsed by https://github.com/thisismiller/dbdiag
* See: https://transactional.blog/blog/2022-dbdiag-ophistory
*/
#include <atomic>
#include <cassert>
#include <print>
#include <random>
#include <thread>
#include <vector>
using std::atomic;
using std::printf;
using std::this_thread::get_id;
using std::this_thread::yield;
using std::thread;
using std::vector;
template <typename T>
class Queue {
atomic<int> back;
atomic<T *> *items;
public:
Queue(int sz) : back(0), items(new atomic<T *>[sz]) {}
~Queue() { delete[] items; }
void enq(T *x);
T *deq();
};
template<typename T>
void Queue<T>::enq(T *x) {
int i = back++;
items[i] = x;
}
template<typename T>
T *Queue<T>::deq() {
while (true) {
int range = back;
for (int i = 0; i < range; ++i) {
T *x = std::atomic_exchange(&items[i], nullptr);
if (x != nullptr) return x;
}
}
}
void produce(Queue<char> *queue, char letters[26], int n) {
std::hash<thread::id> hasher;
unsigned int id = hasher(get_id()) % 100;
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
std::default_random_engine generator(seed);
std::uniform_int_distribution<int> distribution(0, 25);
char *c;
for(int i=0; i<n; ++i) {
int offset = distribution(generator);
printf("%02u: enq(%c) a\n", id, letters[offset]);
c = letters + offset;
queue->enq(c);
printf("%02u: END a\n", id);
}
}
void consume(Queue<char> *queue, int n) {
std::hash<thread::id> hasher;
unsigned int id = hasher(get_id()) % 100;
for(int i=0; i<n; ++i) {
yield();
printf("%02u: deq() a\n", id); yield();
char *c = queue->deq(); yield();
printf("%02u: %c a\n", id, *c); yield();
}
}
void join_all(vector<thread> &threads) {
for(auto& t : threads ) {
t.join();
}
}
char letters[26]; // defined globally so it never goes out of scope
void start_producers(Queue<char> *queue, int n, int max_iterations) {
for (char c = 'A'; c <= 'Z'; ++c) {
letters[c-'A']=c;
}
vector<thread> threads;
for(int i=0;i<n;++i) {
threads.push_back(thread(produce, queue, letters, max_iterations));
}
join_all(threads);
}
void start_consumers(Queue<char> *queue, int n, int max_iterations) {
vector<thread> threads;
for(int i=0;i<n;++i) {
threads.push_back(thread(consume, queue, max_iterations));
}
join_all(threads);
}
int main() {
int producers = 3;
int consumers = 3;
int max_iterations = 3;
Queue<char> queue(producers*max_iterations);
thread tp = thread(start_producers, &queue, producers, max_iterations);
thread tc = thread(start_consumers, &queue, consumers, max_iterations);
tp.join();
tc.join();
return 0;
}