Skip to content

Commit

Permalink
Improvements to avoid boom & bust scenarios.
Browse files Browse the repository at this point in the history
We now continually check how many processing jobs we have vs the
output queue size so we don't launch more jobs than fit in the output
queue, causing subsequent waiting in theads later on as we over-egged
the pudding.  This caused a yo-yo effect that overall harms
performance.
  • Loading branch information
jkbonfield committed Jun 8, 2016
1 parent 2af39f8 commit bd32ec6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
5 changes: 5 additions & 0 deletions htslib/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ typedef struct t_pool {
// A single mutex used when updating this and any associated structure.
pthread_mutex_t pool_m;

// Tracking of average number of running jobs.
// This can be used to dampen any hysteresis caused by bursty
// input availability.
int n_count, n_running;

// Debugging to check wait time.
// FIXME: should we just delete these and cull the associated code?
long long total_time, wait_time;
Expand Down
75 changes: 61 additions & 14 deletions thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ static int t_pool_add_result(t_pool_job *j, void *data) {
return 0;
}

/*
* Returns true or false depending on whether the output queue is full.
*/
static int t_pool_queue_output_full(t_pool_queue *q) {
int full;

pthread_mutex_lock(&q->p->pool_m);
full = (q->qsize && q->n_output >= q->qsize);
pthread_mutex_unlock(&q->p->pool_m);
return full;
}
// /*
// * Returns true or false depending on whether the output queue is full.
// */
// static int t_pool_queue_output_full(t_pool_queue *q) {
// int full;
//
// pthread_mutex_lock(&q->p->pool_m);
// full = (q->qsize && q->n_output >= q->qsize);
// pthread_mutex_unlock(&q->p->pool_m);
// return full;
// }

static void wake_next_worker(t_pool_queue *q, int locked);

Expand Down Expand Up @@ -454,10 +454,16 @@ static void *t_pool_worker(void *arg) {

// Iterate over queues, finding one with jobs and also
// room to put the result.
if (q && q->input_head && !t_pool_queue_output_full(q)) {
//if (q && q->input_head && !t_pool_queue_output_full(q)) {
if (q && q->input_head && q->qsize - q->n_output > p->tsize - p->nwaiting) {
//printf("Work\n");
work_to_do = 1;
break;
}
// if (q && q->input_head) {
// printf("No work: %d-%d > %d-%d\n",
// q->qsize, q->n_output, p->tsize, p->nwaiting);
// }

if (q) q = q->next;
} while (q && q != first);
Expand Down Expand Up @@ -486,6 +492,9 @@ static void *t_pool_worker(void *arg) {
p->t_stack_top = w->idx;

p->t_stack[w->idx] = 1;
// printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n",
// w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
// t_pool_queue_output_full(p->q_head));
pthread_cond_wait(&w->pending_c, &p->pool_m);
p->t_stack[w->idx] = 0;

Expand All @@ -512,7 +521,7 @@ static void *t_pool_worker(void *arg) {
// Otherwise work_to_do, so process as many items in this queue as
// possible before switching to another queue. This means threads
// often end up being dedicated to one type of work.
while (q->input_head && !t_pool_queue_output_full(q)) {
while (q->input_head && q->qsize - q->n_output > q->n_processing) {
if (p->shutdown)
goto shutdown;

Expand Down Expand Up @@ -581,7 +590,42 @@ static void wake_next_worker(t_pool_queue *q, int locked) {
// rather than how many are waiting. A limit on output queue size makes
// these two figures different.
assert(p->njobs >= q->n_input);
if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)

int running = p->tsize - p->nwaiting;
int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
&& (!q || q->n_processing < q->qsize - q->n_output);

//#define AVG_USAGE
#ifdef AVG_USAGE
// Track average number of running threads and try to keep close.
// We permit this to change, but slowly. This avoids "boom and bust" cycles
// where we read a lot of data, start a lot of jobs, then become idle again.
// This way some threads run steadily and others dormant, which is better
// for throughput.
//
// It's 50:50 if this is a good thing. It helps some tasks quite significantly
// while slightly hindering other (perhaps more usual) jobs.

if (++p->n_count == 256) {
p->n_count >>= 1;
p->n_running >>= 1;
}
p->n_running += running;
// Built in lag to avoid see-sawing. Is this safe in all cases?
if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
#endif

if (0) {
printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
running, q->n_output, q->qsize - q->n_output,
p->n_running/p->n_count, sig);
int i;
for (i = 0; i < p->tsize; i++)
putchar("x "[p->t_stack[i]]);
putchar('\n');
}

if (sig)
pthread_cond_signal(&p->t[p->t_stack_top].pending_c);

if (!locked)
Expand All @@ -603,9 +647,12 @@ t_pool *t_pool_init(int n) {
p->shutdown = 0;
p->q_head = NULL;
p->t_stack = NULL;
p->n_count = 0;
p->n_running = 0;
#ifdef DEBUG_TIME
p->total_time = p->wait_time = 0;
#endif


p->t = malloc(n * sizeof(p->t[0]));

Expand Down

0 comments on commit bd32ec6

Please sign in to comment.