-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbounded_buffer.c
191 lines (158 loc) · 4.9 KB
/
bounded_buffer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/**
* Thread-safe bounded buffer
*
* Multiple threads accessing the same bounded buffer - skeleton code.
*
* Author: Nikos Nikoleris <[email protected]>
*
*/
#include <stdio.h> /* printf(), fprintf */
#include <stdlib.h> /* [s]rand() */
#include <unistd.h> /* sleep() */
#include <pthread.h> /* pthread_... */
#include <semaphore.h> /* sem_... */
#define BUFFER_SIZE 5
#define PRODUCERS 2
#define CONSUMERS 4
#define ITERATIONS 20
#define PRODUCER_ITERATIONS (ITERATIONS / PRODUCERS)
#define CONSUMER_ITERATIONS (ITERATIONS / CONSUMERS)
typedef struct {
int value[BUFFER_SIZE];
int next_in, next_out;
} buffer_t;
sem_t prod_sem; //producer semaphore
sem_t con_sem; //consumer semaphore
sem_t mutex; //Used to enforce mutual exclusion
buffer_t buffer;
pthread_t consumer_tid[CONSUMERS], producer_tid[PRODUCERS];
/* *
* insert_item - thread safe(?) function to insert items to the bounded buffer
* @param item the value to be inserted
* @return 0 in case of sucess -1 otherwise
*/
int
insert_item(int item, long int id)
{
/* TODO: Check and wait if the buffer is full. Ensure exclusive
* access to the buffer and use the existing code to remove an item.
*/
sem_wait(&prod_sem);
sem_wait(&mutex);
//sem_wait(&prod_sem); Adding this line and removing the first line (Switching the waits) cases deadlock
buffer.value[buffer.next_in] = item;
buffer.next_in = (buffer.next_in + 1) % BUFFER_SIZE;
printf("producer %ld: inserted %d\n", id, item);
sem_post(&mutex); //Release the buffer from the critical state (Let consumer know it can take an item)
sem_post(&con_sem); //Increment the number of full spots
//sem_post(&mutex); Adding this line and removing the top one leads to starvation (All data goes into one index in the buffer)
return 0;
}
/**
* remove_item - thread safe(?) function to remove items to the bounded buffer
* @param item the address of the variable that the removed value will be written
* @return 0 in case of sucess -1 otherwise
*/
int
remove_item(int *item, long int id)
{
/* TODO: Check and wait if the buffer is empty. Ensure exclusive
* access to the buffer and use the existing code to remove an item.
*/
sem_wait(&con_sem);
sem_wait(&mutex);
//sem_wait(&con_sem); Adding this line and removing the first one causes deadlock
*item = buffer.value[buffer.next_out];
buffer.value[buffer.next_out] = -1;
buffer.next_out = (buffer.next_out + 1) % BUFFER_SIZE;
printf("consumer %ld: removed %d\n", id, *item);
sem_post(&mutex); //Release the buffer from the critical state (Let producer know it can add an item)
sem_post(&prod_sem); //Release an empty spot
//sem_post(&mutex); //Adding this line and removing the top one leads to starvation
return 0;
}
/**
* producer - will iterate PRODUCER_ITERATION times and call the corresponding
* function to insert an integer to the bounded buffer
* @param param an integer id of the producer used to distinguish between the
* multiple producer threads
* @return nothing
*/
void *
producer(void *param)
{
int item, i;
long int id = (long int)param;
printf("producer started\n");
i = PRODUCER_ITERATIONS;
while (i--) {
sleep(rand() % 3);
item = rand() % 10000;
if (insert_item(item, id))
fprintf(stderr, "Error while inserting to buffer\n");
}
pthread_exit(0);
}
/**
* consumer - will iterate CONSUMER_ITERATION times and call the corresponding
* function to remove an integer from the bounded buffer
* @param param an integer id of the producer used to distinguish between the
* multiple consumer threads
* @return nothing
*/
void *
consumer(void *param)
{
int item, i;
long int id = (long int)param;
printf("consumer started\n");
i = CONSUMER_ITERATIONS;
while (i--) {
sleep(rand() % 3);
if (remove_item(&item, id))
fprintf(stderr, "Error while removing from buffer\n");
}
pthread_exit(0);
}
int
main()
{
long int i;
srand(time(NULL));
//Initialize the semaphores
sem_init(&con_sem,0,0);
sem_init(&prod_sem,0,BUFFER_SIZE);
sem_init(&mutex,0,1);
/* Create the consumer threads */
for (i = 0; i < CONSUMERS; i++)
if (pthread_create(&consumer_tid[i], NULL, consumer, (void *)i) != 0) {
perror("pthread_create");
abort();
}
/* Create the producer threads */
for (i = 0; i < PRODUCERS; i++)
if (pthread_create(&producer_tid[i], NULL, producer, (void *)i) != 0) {
perror("pthread_create");
abort();
}
/* Wait for them to complete */
for (i = 0; i < CONSUMERS; i++)
if (pthread_join(consumer_tid[i], NULL) != 0) {
perror("pthread_join");
abort();
}
for (i = 0; i < PRODUCERS; i++)
if (pthread_join(producer_tid[i], NULL) != 0) {
perror("pthread_join");
abort();
}
return 0;
}
/*
* Local Variables:
* mode: c
* c-basic-offset: 4
* indent-tabs-mode: nil
* c-file-style: "stroustrup"
* End:
*/