From bd32ec6cdbb854ee88f32fab21c8d7a609dce448 Mon Sep 17 00:00:00 2001 From: James Bonfield Date: Wed, 8 Jun 2016 18:04:41 +0100 Subject: [PATCH] Improvements to avoid boom & bust scenarios. We now continually check how many processing jobs we have vs the output queue size so we don't launch more jobs than fit in the output queue, causing subsequent waiting in theads later on as we over-egged the pudding. This caused a yo-yo effect that overall harms performance. --- htslib/thread_pool.h | 5 +++ thread_pool.c | 75 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/htslib/thread_pool.h b/htslib/thread_pool.h index e77279068..0e99ac9e2 100644 --- a/htslib/thread_pool.h +++ b/htslib/thread_pool.h @@ -152,6 +152,11 @@ typedef struct t_pool { // A single mutex used when updating this and any associated structure. pthread_mutex_t pool_m; + // Tracking of average number of running jobs. + // This can be used to dampen any hysteresis caused by bursty + // input availability. + int n_count, n_running; + // Debugging to check wait time. // FIXME: should we just delete these and cull the associated code? long long total_time, wait_time; diff --git a/thread_pool.c b/thread_pool.c index 468da9c13..190d7e924 100644 --- a/thread_pool.c +++ b/thread_pool.c @@ -126,17 +126,17 @@ static int t_pool_add_result(t_pool_job *j, void *data) { return 0; } -/* - * Returns true or false depending on whether the output queue is full. - */ -static int t_pool_queue_output_full(t_pool_queue *q) { - int full; - - pthread_mutex_lock(&q->p->pool_m); - full = (q->qsize && q->n_output >= q->qsize); - pthread_mutex_unlock(&q->p->pool_m); - return full; -} +// /* +// * Returns true or false depending on whether the output queue is full. +// */ +// static int t_pool_queue_output_full(t_pool_queue *q) { +// int full; +// +// pthread_mutex_lock(&q->p->pool_m); +// full = (q->qsize && q->n_output >= q->qsize); +// pthread_mutex_unlock(&q->p->pool_m); +// return full; +// } static void wake_next_worker(t_pool_queue *q, int locked); @@ -454,10 +454,16 @@ static void *t_pool_worker(void *arg) { // Iterate over queues, finding one with jobs and also // room to put the result. - if (q && q->input_head && !t_pool_queue_output_full(q)) { + //if (q && q->input_head && !t_pool_queue_output_full(q)) { + if (q && q->input_head && q->qsize - q->n_output > p->tsize - p->nwaiting) { + //printf("Work\n"); work_to_do = 1; break; } +// if (q && q->input_head) { +// printf("No work: %d-%d > %d-%d\n", +// q->qsize, q->n_output, p->tsize, p->nwaiting); +// } if (q) q = q->next; } while (q && q != first); @@ -486,6 +492,9 @@ static void *t_pool_worker(void *arg) { p->t_stack_top = w->idx; p->t_stack[w->idx] = 1; +// printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n", +// w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output, +// t_pool_queue_output_full(p->q_head)); pthread_cond_wait(&w->pending_c, &p->pool_m); p->t_stack[w->idx] = 0; @@ -512,7 +521,7 @@ static void *t_pool_worker(void *arg) { // Otherwise work_to_do, so process as many items in this queue as // possible before switching to another queue. This means threads // often end up being dedicated to one type of work. - while (q->input_head && !t_pool_queue_output_full(q)) { + while (q->input_head && q->qsize - q->n_output > q->n_processing) { if (p->shutdown) goto shutdown; @@ -581,7 +590,42 @@ static void wake_next_worker(t_pool_queue *q, int locked) { // rather than how many are waiting. A limit on output queue size makes // these two figures different. assert(p->njobs >= q->n_input); - if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) + + int running = p->tsize - p->nwaiting; + int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting + && (!q || q->n_processing < q->qsize - q->n_output); + +//#define AVG_USAGE +#ifdef AVG_USAGE + // Track average number of running threads and try to keep close. + // We permit this to change, but slowly. This avoids "boom and bust" cycles + // where we read a lot of data, start a lot of jobs, then become idle again. + // This way some threads run steadily and others dormant, which is better + // for throughput. + // + // It's 50:50 if this is a good thing. It helps some tasks quite significantly + // while slightly hindering other (perhaps more usual) jobs. + + if (++p->n_count == 256) { + p->n_count >>= 1; + p->n_running >>= 1; + } + p->n_running += running; + // Built in lag to avoid see-sawing. Is this safe in all cases? + if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0; +#endif + + if (0) { + printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs, + running, q->n_output, q->qsize - q->n_output, + p->n_running/p->n_count, sig); + int i; + for (i = 0; i < p->tsize; i++) + putchar("x "[p->t_stack[i]]); + putchar('\n'); + } + + if (sig) pthread_cond_signal(&p->t[p->t_stack_top].pending_c); if (!locked) @@ -603,9 +647,12 @@ t_pool *t_pool_init(int n) { p->shutdown = 0; p->q_head = NULL; p->t_stack = NULL; + p->n_count = 0; + p->n_running = 0; #ifdef DEBUG_TIME p->total_time = p->wait_time = 0; #endif + p->t = malloc(n * sizeof(p->t[0]));