From 18eeb9de49bcc92fb320829cf9da8dbd88b14a83 Mon Sep 17 00:00:00 2001 From: Martijn van Beurden Date: Fri, 23 Feb 2024 09:33:38 +0100 Subject: [PATCH 1/5] Add explanation as to how threading works --- src/libFLAC/stream_encoder.c | 50 ++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c index fa21ebce0d..c61c2a028b 100644 --- a/src/libFLAC/stream_encoder.c +++ b/src/libFLAC/stream_encoder.c @@ -1677,6 +1677,22 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) /* first finish threads */ if(encoder->protected_->num_threads > 1) { #ifdef HAVE_PTHREAD + /* This is quite complicated, so here is an explanation on what is supposed to happen + * + * Thread no.0 and threadtask no.0 are reserved for non-threaded operation, so counting + * here starts at 1, which makes things slightly more complicated. + * + * If the file processed was very short compared to the requested number of threadtasks, + * not all threadtasks have been populated yet. Handling that is easy: threadtask no.1 needs + * to be processed first, monotonically increasing until the last populated threadtask is + * processed. This number is stored in encoder->private_->num_started_threadtasks + * + * If the file is longer, the next due frame chronologically might not be in threadtasks + * number 1, because the threadtasks work like a ringbuffer. To access this, the variable + * twrap starts counting at the next due frame, and the modulo operator (%) is used to + * "wrap" the number with the number of threadtasks. So, if the next due task is 3 + * and 4 tasks are started, twrap increases 3, 4, 5, 6, and t follows with values 3, 4, 1, 2. + */ uint32_t start, end, t, twrap; if(encoder->private_->num_started_threadtasks < encoder->private_->num_threadtasks) { start = 1; @@ -1695,6 +1711,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) if(ok && !write_bitbuffer_(encoder, encoder->private_->threadtask[t], encoder->protected_->blocksize, 0)) ok = false; } + /* Wait for MD5 calculation to finish */ pthread_mutex_lock(&encoder->private_->mutex_work_queue); while(encoder->private_->md5_active || encoder->private_->md5_fifo.tail > 0) { pthread_cond_wait(&encoder->private_->cond_md5_emptied, &encoder->private_->mutex_work_queue); @@ -1717,6 +1734,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) if(encoder->protected_->num_threads > 1) { #ifdef HAVE_PTHREAD + /* Properly finish all threads */ uint32_t t; pthread_mutex_lock(&encoder->private_->mutex_work_queue); for(t = 1; t < encoder->private_->num_created_threads; t++) @@ -3461,6 +3479,38 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block } else { #ifdef HAVE_PTHREAD + /* This bit is quite complicated, so here are some pointers: + * + * When this bit of code is reached for the first time, new threads are spawned and + * threadtasks are populated until the total number of threads equals the requested number + * of threads. Next, threadtasks are populated until they there are no more available. + * Next, this main thread checks whether the threadtask that is due chronologically is + * done. If it is, the bitbuffer is written and the threadtask memory reused for the next + * frame. If it is not done, the main thread checks whether there is enough work left in the + * queue. If there is a lot of work left, the main thread starts on some of it too. + * If not a lot of work is left, the main thread goes to sleep until the frame due first is + * finished. + * + * - encoder->private_->next_thread is the number of the next thread to be created or, when + * the required number of threads is created, the next threadtask to be populated, + * or, when all threadtasks have been populated once, the next threadtask that needs + * to finish and thus reused. + * - encoder->private_->next_threadtask is the number of the next threadtask that a thread + * can start work on. + * + * So, in effect, next_thread is (after startup) a pointer considering the chronological + * order, so input/output isn't shuffled. next_threadtask is a pointer to the next task that + * hasn't been picked up by a thread yet. This distinction enables threads to work on frames + * in a non-chronological order + * + * encoder->protected_->num_threads is the max number of threads that can be spawned + * encoder->private_->num_created_threads is the number of threads that has been spawned + * encoder->private_->num_threadtasks keeps track of how many threadtasks are available + * encoder->private_->num_started_threadtasks keeps track of how many threadtasks have been populated + * + * NOTE: thread no. 0 and threadtask no. 0 are reserved for non-threaded operations, so next_thread + * and next_threadtask start at 1 + */ if(encoder->private_->num_created_threads < encoder->protected_->num_threads) { /* Create a new thread */ pthread_create(&encoder->private_->thread[encoder->private_->next_thread], From 8b30a3653604273d8751f8655ce094e013be1bb9 Mon Sep 17 00:00:00 2001 From: Martijn van Beurden Date: Fri, 23 Feb 2024 11:55:40 +0100 Subject: [PATCH 2/5] Add extra debug checks for threading --- src/libFLAC/stream_encoder.c | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c index c61c2a028b..d406590d2e 100644 --- a/src/libFLAC/stream_encoder.c +++ b/src/libFLAC/stream_encoder.c @@ -94,6 +94,15 @@ typedef dispatch_semaphore_t sem_t; * yielding compression within 0.1% of the optimal parameters. */ #undef ENABLE_RICE_PARAMETER_SEARCH +/* To enable or disable extra checks during threading. This is useful + * for debugging. Using it requires something like threadsanitizer + * to notice the result of the extra checks and output info + */ +#if defined(__has_feature) +# if __has_feature(thread_sanitizer) +# define ENABLE_EXTRA_THREADING_CHECKS +# endif +#endif typedef struct { @@ -206,6 +215,9 @@ typedef struct FLAC__StreamEncoderThreadTask { sem_t sem_work_available; /* To signal to thread that work is available */ sem_t sem_work_done; /* To signal from thread that work is done */ FLAC__bool returnvalue; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + pthread_mutex_t mutex_extra_check; +#endif #endif } FLAC__StreamEncoderThreadTask; @@ -1233,6 +1245,17 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_( encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } +#ifdef ENABLE_EXTRA_THREADING_CHECKS + if(pthread_mutex_init(&encoder->private_->threadtask[t]->mutex_extra_check, NULL)) { + sem_destroy(&encoder->private_->threadtask[t]->sem_work_available); + sem_destroy(&encoder->private_->threadtask[t]->sem_work_done); + FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); + free(encoder->private_->threadtask[t]); + encoder->private_->threadtask[t] = 0; + encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; + return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; + } +#endif for(i = 0; i < FLAC__MAX_CHANNELS; i++) { encoder->private_->threadtask[t]->subframe_workspace_ptr[i][0] = &encoder->private_->threadtask[t]->subframe_workspace[i][0]; @@ -1706,10 +1729,19 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) FLAC__ASSERT(twrap > 0); t = (twrap - 1) % (encoder->private_->num_threadtasks - 1) + 1; sem_wait(&encoder->private_->threadtask[t]->sem_work_done); +#ifdef ENABLE_EXTRA_THREADING_CHECKS + if(pthread_mutex_trylock(&encoder->private_->threadtask[t]->mutex_extra_check) != 0) { + /* To trigger threadsanitizer to output a nice summary */ + pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_extra_check); + } +#endif if(!encoder->private_->threadtask[t]->returnvalue) ok = false; if(ok && !write_bitbuffer_(encoder, encoder->private_->threadtask[t], encoder->protected_->blocksize, 0)) ok = false; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + pthread_mutex_unlock(&encoder->private_->threadtask[t]->mutex_extra_check); +#endif } /* Wait for MD5 calculation to finish */ pthread_mutex_lock(&encoder->private_->mutex_work_queue); @@ -2785,6 +2817,9 @@ void free_(FLAC__StreamEncoder *encoder) FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); sem_destroy(&encoder->private_->threadtask[t]->sem_work_available); sem_destroy(&encoder->private_->threadtask[t]->sem_work_done); +#ifdef ENABLE_EXTRA_THREADING_CHECKS + pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_extra_check); +#endif free(encoder->private_->threadtask[t]); encoder->private_->threadtask[t] = 0; #else @@ -3529,6 +3564,12 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block if(encoder->private_->num_available_threadtasks > (encoder->protected_->num_threads - 1)) { FLAC__StreamEncoderThreadTask * task = NULL; task = encoder->private_->threadtask[encoder->private_->next_threadtask]; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + if(pthread_mutex_trylock(&task->mutex_extra_check) != 0) { + /* To trigger threadsanitizer to output a nice summary */ + pthread_mutex_destroy(&task->mutex_extra_check); + } +#endif encoder->private_->num_available_threadtasks--; encoder->private_->next_threadtask++; if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks) @@ -3546,10 +3587,20 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block /* Wait for thread to finish, then write bitbuffer */ if(!encoder->private_->threadtask[encoder->private_->next_thread]->returnvalue) return false; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + if(pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check) != 0) { + /* To trigger threadsanitizer to output a nice summary */ + pthread_mutex_destroy(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check); + + } +#endif if(!write_bitbuffer_(encoder, encoder->private_->threadtask[encoder->private_->next_thread], encoder->protected_->blocksize, is_last_block)) { /* the above function sets the state for us in case of an error */ return false; } +#ifdef ENABLE_EXTRA_THREADING_CHECKS + pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check); +#endif } /* Copy input data for MD5 calculation */ if(encoder->protected_->do_md5) { @@ -3658,6 +3709,13 @@ void * process_frame_thread_(void * args) { else if(encoder->private_->num_available_threadtasks > 0) { FLAC__StreamEncoderThreadTask * task = NULL; task = encoder->private_->threadtask[encoder->private_->next_threadtask]; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + if(pthread_mutex_trylock(&task->mutex_extra_check) != 0) { + /* To trigger threadsanitizer to output a nice summary */ + pthread_mutex_destroy(&task->mutex_extra_check); + } +#endif + encoder->private_->num_available_threadtasks--; encoder->private_->next_threadtask++; if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks) @@ -3707,6 +3765,9 @@ FLAC__bool process_frame_thread_inner_(FLAC__StreamEncoder * encoder, FLAC__Stre ok = false; } task->returnvalue = ok; +#ifdef ENABLE_EXTRA_THREADING_CHECKS + pthread_mutex_unlock(&task->mutex_extra_check); +#endif sem_post(&task->sem_work_done); return true; } From ad75ff75a8079e1c189190d6ae52953c10a0caa5 Mon Sep 17 00:00:00 2001 From: Martijn van Beurden Date: Mon, 26 Feb 2024 10:22:30 +0100 Subject: [PATCH 3/5] Switch from semaphores to mutexes, remove extra debug checks --- src/libFLAC/stream_encoder.c | 122 +++++++++++------------------------ 1 file changed, 36 insertions(+), 86 deletions(-) diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c index d406590d2e..95b6f98422 100644 --- a/src/libFLAC/stream_encoder.c +++ b/src/libFLAC/stream_encoder.c @@ -45,18 +45,6 @@ #endif #ifdef HAVE_PTHREAD #include -#ifdef __APPLE__ -/* Mac does have sem_init, but it doesn't work */ -#include -typedef dispatch_semaphore_t sem_t; -#define sem_init(sem,unused,value) ((*sem = dispatch_semaphore_create(value)) == NULL) -#define sem_post(sem) dispatch_semaphore_signal(*sem) -#define sem_wait(sem) dispatch_semaphore_wait(*sem, DISPATCH_TIME_FOREVER) -#define sem_trywait(sem) dispatch_semaphore_wait(*sem, DISPATCH_TIME_NOW) -#define sem_destroy(sem) dispatch_release(*sem) -#else -#include -#endif #endif #include "share/compat.h" #include "FLAC/assert.h" @@ -94,15 +82,6 @@ typedef dispatch_semaphore_t sem_t; * yielding compression within 0.1% of the optimal parameters. */ #undef ENABLE_RICE_PARAMETER_SEARCH -/* To enable or disable extra checks during threading. This is useful - * for debugging. Using it requires something like threadsanitizer - * to notice the result of the extra checks and output info - */ -#if defined(__has_feature) -# if __has_feature(thread_sanitizer) -# define ENABLE_EXTRA_THREADING_CHECKS -# endif -#endif typedef struct { @@ -212,12 +191,10 @@ typedef struct FLAC__StreamEncoderThreadTask { FLAC__EntropyCodingMethod_PartitionedRiceContents partitioned_rice_contents_extra[2]; /* from find_best_partition_order_() */ FLAC__bool disable_constant_subframes; #ifdef HAVE_PTHREAD - sem_t sem_work_available; /* To signal to thread that work is available */ - sem_t sem_work_done; /* To signal from thread that work is done */ + pthread_mutex_t mutex_this_task; /* To lock whole threadtask */ + pthread_cond_t cond_task_done; + FLAC__bool task_done; FLAC__bool returnvalue; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - pthread_mutex_t mutex_extra_check; -#endif #endif } FLAC__StreamEncoderThreadTask; @@ -1230,32 +1207,21 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_( encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } - if(sem_init(&encoder->private_->threadtask[t]->sem_work_available, 0, 0)) { - FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); - free(encoder->private_->threadtask[t]); - encoder->private_->threadtask[t] = 0; - encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; - return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; - } - if(sem_init(&encoder->private_->threadtask[t]->sem_work_done, 0, 0)) { - sem_destroy(&encoder->private_->threadtask[t]->sem_work_available); + if(pthread_mutex_init(&encoder->private_->threadtask[t]->mutex_this_task, NULL)) { FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); free(encoder->private_->threadtask[t]); encoder->private_->threadtask[t] = 0; encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } -#ifdef ENABLE_EXTRA_THREADING_CHECKS - if(pthread_mutex_init(&encoder->private_->threadtask[t]->mutex_extra_check, NULL)) { - sem_destroy(&encoder->private_->threadtask[t]->sem_work_available); - sem_destroy(&encoder->private_->threadtask[t]->sem_work_done); + if(pthread_cond_init(&encoder->private_->threadtask[t]->cond_task_done, NULL)) { + pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_this_task); FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); free(encoder->private_->threadtask[t]); encoder->private_->threadtask[t] = 0; encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } -#endif for(i = 0; i < FLAC__MAX_CHANNELS; i++) { encoder->private_->threadtask[t]->subframe_workspace_ptr[i][0] = &encoder->private_->threadtask[t]->subframe_workspace[i][0]; @@ -1728,20 +1694,15 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) for(twrap = start; twrap < end; twrap++) { FLAC__ASSERT(twrap > 0); t = (twrap - 1) % (encoder->private_->num_threadtasks - 1) + 1; - sem_wait(&encoder->private_->threadtask[t]->sem_work_done); -#ifdef ENABLE_EXTRA_THREADING_CHECKS - if(pthread_mutex_trylock(&encoder->private_->threadtask[t]->mutex_extra_check) != 0) { - /* To trigger threadsanitizer to output a nice summary */ - pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_extra_check); - } -#endif + pthread_mutex_lock(&encoder->private_->threadtask[t]->mutex_this_task); + while(!encoder->private_->threadtask[t]->task_done) + pthread_cond_wait(&encoder->private_->threadtask[t]->cond_task_done,&encoder->private_->threadtask[t]->mutex_this_task); + if(!encoder->private_->threadtask[t]->returnvalue) ok = false; if(ok && !write_bitbuffer_(encoder, encoder->private_->threadtask[t], encoder->protected_->blocksize, 0)) ok = false; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - pthread_mutex_unlock(&encoder->private_->threadtask[t]->mutex_extra_check); -#endif + pthread_mutex_unlock(&encoder->private_->threadtask[t]->mutex_this_task); } /* Wait for MD5 calculation to finish */ pthread_mutex_lock(&encoder->private_->mutex_work_queue); @@ -2815,11 +2776,8 @@ void free_(FLAC__StreamEncoder *encoder) if(t > 0) { #ifdef HAVE_PTHREAD FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame); - sem_destroy(&encoder->private_->threadtask[t]->sem_work_available); - sem_destroy(&encoder->private_->threadtask[t]->sem_work_done); -#ifdef ENABLE_EXTRA_THREADING_CHECKS - pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_extra_check); -#endif + pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_this_task); + pthread_cond_destroy(&encoder->private_->threadtask[t]->cond_task_done); free(encoder->private_->threadtask[t]); encoder->private_->threadtask[t] = 0; #else @@ -3556,51 +3514,46 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block /* If the first task in the queue is still running, check whether there is enough work * left in the queue. If there is, start on some */ if(encoder->protected_->loose_mid_side_stereo) { - sem_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done); + pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + if(!encoder->private_->threadtask[encoder->private_->next_thread]->task_done) + pthread_cond_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->cond_task_done, &encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); } else { - while(sem_trywait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done)) { + int mutex_result = pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + while(mutex_result || !encoder->private_->threadtask[encoder->private_->next_thread]->task_done) { + if(!mutex_result) + pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + pthread_mutex_lock(&encoder->private_->mutex_work_queue); if(encoder->private_->num_available_threadtasks > (encoder->protected_->num_threads - 1)) { FLAC__StreamEncoderThreadTask * task = NULL; task = encoder->private_->threadtask[encoder->private_->next_threadtask]; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - if(pthread_mutex_trylock(&task->mutex_extra_check) != 0) { - /* To trigger threadsanitizer to output a nice summary */ - pthread_mutex_destroy(&task->mutex_extra_check); - } -#endif encoder->private_->num_available_threadtasks--; encoder->private_->next_threadtask++; if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks) encoder->private_->next_threadtask = 1; pthread_mutex_unlock(&encoder->private_->mutex_work_queue); + pthread_mutex_lock(&task->mutex_this_task); process_frame_thread_inner_(encoder, task); + mutex_result = pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); } else { pthread_mutex_unlock(&encoder->private_->mutex_work_queue); - sem_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done); - break; + pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + while(!encoder->private_->threadtask[encoder->private_->next_thread]->task_done) + pthread_cond_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->cond_task_done,&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + mutex_result = 0; } } } /* Wait for thread to finish, then write bitbuffer */ if(!encoder->private_->threadtask[encoder->private_->next_thread]->returnvalue) return false; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - if(pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check) != 0) { - /* To trigger threadsanitizer to output a nice summary */ - pthread_mutex_destroy(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check); - - } -#endif if(!write_bitbuffer_(encoder, encoder->private_->threadtask[encoder->private_->next_thread], encoder->protected_->blocksize, is_last_block)) { /* the above function sets the state for us in case of an error */ return false; } -#ifdef ENABLE_EXTRA_THREADING_CHECKS - pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_extra_check); -#endif + pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); } /* Copy input data for MD5 calculation */ if(encoder->protected_->do_md5) { @@ -3620,14 +3573,18 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block } /* Copy input data for frame creation */ + pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); for(i = 0; i < encoder->protected_->channels; i++) memcpy(encoder->private_->threadtask[encoder->private_->next_thread]->integer_signal[i], encoder->private_->threadtask[0]->integer_signal[i], encoder->protected_->blocksize * sizeof(encoder->private_->threadtask[0]->integer_signal[i][0])); encoder->private_->threadtask[encoder->private_->next_thread]->current_frame_number = encoder->private_->current_frame_number; + pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); + pthread_mutex_lock(&encoder->private_->mutex_work_queue); if(encoder->private_->num_started_threadtasks < encoder->private_->num_threadtasks) encoder->private_->num_started_threadtasks++; encoder->private_->num_available_threadtasks++; + encoder->private_->threadtask[encoder->private_->next_thread]->task_done = false; pthread_cond_signal(&encoder->private_->cond_work_available); pthread_mutex_unlock(&encoder->private_->mutex_work_queue); @@ -3709,18 +3666,12 @@ void * process_frame_thread_(void * args) { else if(encoder->private_->num_available_threadtasks > 0) { FLAC__StreamEncoderThreadTask * task = NULL; task = encoder->private_->threadtask[encoder->private_->next_threadtask]; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - if(pthread_mutex_trylock(&task->mutex_extra_check) != 0) { - /* To trigger threadsanitizer to output a nice summary */ - pthread_mutex_destroy(&task->mutex_extra_check); - } -#endif - encoder->private_->num_available_threadtasks--; encoder->private_->next_threadtask++; if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks) encoder->private_->next_threadtask = 1; pthread_mutex_unlock(&encoder->private_->mutex_work_queue); + pthread_mutex_lock(&task->mutex_this_task); if(!process_frame_thread_inner_(encoder, task)) return NULL; } @@ -3765,10 +3716,9 @@ FLAC__bool process_frame_thread_inner_(FLAC__StreamEncoder * encoder, FLAC__Stre ok = false; } task->returnvalue = ok; -#ifdef ENABLE_EXTRA_THREADING_CHECKS - pthread_mutex_unlock(&task->mutex_extra_check); -#endif - sem_post(&task->sem_work_done); + task->task_done = true; + pthread_cond_signal(&task->cond_task_done); + pthread_mutex_unlock(&task->mutex_this_task); return true; } #endif From 583e9fc1d349fda0f1bcbc63fd94d1bf6da3f3e1 Mon Sep 17 00:00:00 2001 From: Martijn van Beurden Date: Mon, 26 Feb 2024 10:50:09 +0100 Subject: [PATCH 4/5] Add more explanation to threading code --- src/libFLAC/stream_encoder.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c index 95b6f98422..071402fb51 100644 --- a/src/libFLAC/stream_encoder.c +++ b/src/libFLAC/stream_encoder.c @@ -1694,6 +1694,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) for(twrap = start; twrap < end; twrap++) { FLAC__ASSERT(twrap > 0); t = (twrap - 1) % (encoder->private_->num_threadtasks - 1) + 1; + /* Lock mutex, if task isn't done yet, wait for condition */ pthread_mutex_lock(&encoder->private_->threadtask[t]->mutex_this_task); while(!encoder->private_->threadtask[t]->task_done) pthread_cond_wait(&encoder->private_->threadtask[t]->cond_task_done,&encoder->private_->threadtask[t]->mutex_this_task); @@ -3519,6 +3520,10 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block pthread_cond_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->cond_task_done, &encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); } else { + /* First, check whether the mutex for the next due task is locked or free. If it is free (and thus acquired now) and + * the task is done, proceed to the next bit (writing the bitbuffer). If it is either currently locked or not yet + * processed, choose between starting on some work (if there is enough work in the queue) or waiting for the task + * to finish. Either way, release the mutex first, so it doesn't get interlocked with the work queue mutex */ int mutex_result = pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task); while(mutex_result || !encoder->private_->threadtask[encoder->private_->next_thread]->task_done) { if(!mutex_result) @@ -3546,7 +3551,7 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block } } } - /* Wait for thread to finish, then write bitbuffer */ + /* Task is finished, write bitbuffer */ if(!encoder->private_->threadtask[encoder->private_->next_thread]->returnvalue) return false; if(!write_bitbuffer_(encoder, encoder->private_->threadtask[encoder->private_->next_thread], encoder->protected_->blocksize, is_last_block)) { @@ -3621,6 +3626,10 @@ void * process_frame_thread_(void * args) { pthread_mutex_unlock(&encoder->private_->mutex_work_queue); return NULL; } + /* The code below pauses and restarts threads if it is noticed threads are often put too sleep + * because of a lack of work. This reduces overhead when too many threads are active. The + * overcommited indicator is increased when no tasks are available, decreased when more tasks + * are available then threads are running, and reset when a thread is woken up or put to sleep */ if(encoder->private_->num_available_threadtasks == 0) encoder->private_->overcommitted_indicator++; else if(encoder->private_->num_available_threadtasks > encoder->private_->num_running_threads) From a238e877b1b6ba9781f9aa181e932963b0c35931 Mon Sep 17 00:00:00 2001 From: Martijn van Beurden Date: Mon, 26 Feb 2024 13:36:55 +0100 Subject: [PATCH 5/5] Remove checks for semaphore.h in CMake and configure --- CMakeLists.txt | 3 +-- configure.ac | 28 ++++++++++++---------------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a81ff6f616..56c00db86b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,7 +114,6 @@ check_include_file("inttypes.h" HAVE_INTTYPES_H) check_include_file("stdint.h" HAVE_STDINT_H) check_include_file("stdbool.h" HAVE_STDBOOL_H) check_include_file("arm_neon.h" FLAC__HAS_NEONINTRIN) -check_include_file("semaphore.h" HAVE_SEMAPHORE_H) if(NOT HAVE_STDINT_H OR NOT HAVE_STDBOOL_H) message(SEND_ERROR "Header stdint.h and/or stdbool.h not found") @@ -200,7 +199,7 @@ if(CMAKE_BUILD_TYPE STREQUAL Debug OR CMAKE_BUILD_TYPE STREQUAL RelWithDebInfo) add_definitions(-DFLAC__OVERFLOW_DETECT) endif() -if(ENABLE_MULTITHREADING AND HAVE_SEMAPHORE_H) +if(ENABLE_MULTITHREADING) set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(THREADS_PREFER_PTHREAD_FLAG TRUE) find_package(Threads) diff --git a/configure.ac b/configure.ac index dc572d3632..a8896b21d1 100644 --- a/configure.ac +++ b/configure.ac @@ -53,7 +53,7 @@ AM_PROG_CC_C_O AC_C_INLINE AC_C_TYPEOF -AC_CHECK_HEADERS([stdint.h stdbool.h inttypes.h byteswap.h sys/auxv.h sys/param.h sys/ioctl.h termios.h x86intrin.h cpuid.h arm_neon.h semaphore.h]) +AC_CHECK_HEADERS([stdint.h stdbool.h inttypes.h byteswap.h sys/auxv.h sys/param.h sys/ioctl.h termios.h x86intrin.h cpuid.h arm_neon.h]) if test "x$ac_cv_header_stdint_h" != xyes -o "x$ac_cv_header_stdbool_h" != xyes; then AC_MSG_ERROR("Header stdint.h and/or stdbool.h not found") @@ -377,21 +377,17 @@ AC_ARG_ENABLE([multithreading], HAVE_PTHREAD=no if test "x$enable_multithreading" != "xno" ; then - if test "x$ac_cv_header_semaphore_h" != "xyes"; then - AC_MSG_WARN("Header semaphore.h, needed for multithreading, is not found") - else - AX_PTHREAD([ - HAVE_PTHREAD=yes - AC_DEFINE(HAVE_PTHREAD,1,[Define if pthread is enabled]) - LIBS="$PTHREAD_LIBS $LIBS" - CFLAGS="$CFLAGS $PTHREAD_CFLAGS" - CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS" - CC="$PTHREAD_CC" - CXX="$PTHREAD_CXX" - ],[HAVE_PTHREAD=no]) - if test "x${HAVE_PTHREAD}" == "xno"; then - AC_MSG_WARN("pthread support, needed for multithreading, is not found") - fi + AX_PTHREAD([ + HAVE_PTHREAD=yes + AC_DEFINE(HAVE_PTHREAD,1,[Define if pthread is enabled]) + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS" + CC="$PTHREAD_CC" + CXX="$PTHREAD_CXX" + ],[HAVE_PTHREAD=no]) + if test "x${HAVE_PTHREAD}" == "xno"; then + AC_MSG_WARN("pthread support, needed for multithreading, is not found") fi fi