Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from semaphore to mutex #672

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ check_include_file("inttypes.h" HAVE_INTTYPES_H)
check_include_file("stdint.h" HAVE_STDINT_H)
check_include_file("stdbool.h" HAVE_STDBOOL_H)
check_include_file("arm_neon.h" FLAC__HAS_NEONINTRIN)
check_include_file("semaphore.h" HAVE_SEMAPHORE_H)

if(NOT HAVE_STDINT_H OR NOT HAVE_STDBOOL_H)
message(SEND_ERROR "Header stdint.h and/or stdbool.h not found")
Expand Down Expand Up @@ -200,7 +199,7 @@ if(CMAKE_BUILD_TYPE STREQUAL Debug OR CMAKE_BUILD_TYPE STREQUAL RelWithDebInfo)
add_definitions(-DFLAC__OVERFLOW_DETECT)
endif()

if(ENABLE_MULTITHREADING AND HAVE_SEMAPHORE_H)
if(ENABLE_MULTITHREADING)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads)
Expand Down
28 changes: 12 additions & 16 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ AM_PROG_CC_C_O
AC_C_INLINE
AC_C_TYPEOF

AC_CHECK_HEADERS([stdint.h stdbool.h inttypes.h byteswap.h sys/auxv.h sys/param.h sys/ioctl.h termios.h x86intrin.h cpuid.h arm_neon.h semaphore.h])
AC_CHECK_HEADERS([stdint.h stdbool.h inttypes.h byteswap.h sys/auxv.h sys/param.h sys/ioctl.h termios.h x86intrin.h cpuid.h arm_neon.h])

if test "x$ac_cv_header_stdint_h" != xyes -o "x$ac_cv_header_stdbool_h" != xyes; then
AC_MSG_ERROR("Header stdint.h and/or stdbool.h not found")
Expand Down Expand Up @@ -377,21 +377,17 @@ AC_ARG_ENABLE([multithreading],

HAVE_PTHREAD=no
if test "x$enable_multithreading" != "xno" ; then
if test "x$ac_cv_header_semaphore_h" != "xyes"; then
AC_MSG_WARN("Header semaphore.h, needed for multithreading, is not found")
else
AX_PTHREAD([
HAVE_PTHREAD=yes
AC_DEFINE(HAVE_PTHREAD,1,[Define if pthread is enabled])
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS"
CC="$PTHREAD_CC"
CXX="$PTHREAD_CXX"
],[HAVE_PTHREAD=no])
if test "x${HAVE_PTHREAD}" == "xno"; then
AC_MSG_WARN("pthread support, needed for multithreading, is not found")
fi
AX_PTHREAD([
HAVE_PTHREAD=yes
AC_DEFINE(HAVE_PTHREAD,1,[Define if pthread is enabled])
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS"
CC="$PTHREAD_CC"
CXX="$PTHREAD_CXX"
],[HAVE_PTHREAD=no])
if test "x${HAVE_PTHREAD}" == "xno"; then
AC_MSG_WARN("pthread support, needed for multithreading, is not found")
fi
fi

Expand Down
122 changes: 96 additions & 26 deletions src/libFLAC/stream_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,6 @@
#endif
#ifdef HAVE_PTHREAD
#include <pthread.h>
#ifdef __APPLE__
/* Mac does have sem_init, but it doesn't work */
#include <dispatch/dispatch.h>
typedef dispatch_semaphore_t sem_t;
#define sem_init(sem,unused,value) ((*sem = dispatch_semaphore_create(value)) == NULL)
#define sem_post(sem) dispatch_semaphore_signal(*sem)
#define sem_wait(sem) dispatch_semaphore_wait(*sem, DISPATCH_TIME_FOREVER)
#define sem_trywait(sem) dispatch_semaphore_wait(*sem, DISPATCH_TIME_NOW)
#define sem_destroy(sem) dispatch_release(*sem)
#else
#include <semaphore.h>
#endif
#endif
#include "share/compat.h"
#include "FLAC/assert.h"
Expand Down Expand Up @@ -203,8 +191,9 @@ typedef struct FLAC__StreamEncoderThreadTask {
FLAC__EntropyCodingMethod_PartitionedRiceContents partitioned_rice_contents_extra[2]; /* from find_best_partition_order_() */
FLAC__bool disable_constant_subframes;
#ifdef HAVE_PTHREAD
sem_t sem_work_available; /* To signal to thread that work is available */
sem_t sem_work_done; /* To signal from thread that work is done */
pthread_mutex_t mutex_this_task; /* To lock whole threadtask */
pthread_cond_t cond_task_done;
FLAC__bool task_done;
FLAC__bool returnvalue;
#endif
} FLAC__StreamEncoderThreadTask;
Expand Down Expand Up @@ -1218,15 +1207,15 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
if(sem_init(&encoder->private_->threadtask[t]->sem_work_available, 0, 0)) {
if(pthread_mutex_init(&encoder->private_->threadtask[t]->mutex_this_task, NULL)) {
FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame);
free(encoder->private_->threadtask[t]);
encoder->private_->threadtask[t] = 0;
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
if(sem_init(&encoder->private_->threadtask[t]->sem_work_done, 0, 0)) {
sem_destroy(&encoder->private_->threadtask[t]->sem_work_available);
if(pthread_cond_init(&encoder->private_->threadtask[t]->cond_task_done, NULL)) {
pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_this_task);
FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame);
free(encoder->private_->threadtask[t]);
encoder->private_->threadtask[t] = 0;
Expand Down Expand Up @@ -1677,6 +1666,22 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)
/* first finish threads */
if(encoder->protected_->num_threads > 1) {
#ifdef HAVE_PTHREAD
/* This is quite complicated, so here is an explanation on what is supposed to happen
*
* Thread no.0 and threadtask no.0 are reserved for non-threaded operation, so counting
* here starts at 1, which makes things slightly more complicated.
*
* If the file processed was very short compared to the requested number of threadtasks,
* not all threadtasks have been populated yet. Handling that is easy: threadtask no.1 needs
* to be processed first, monotonically increasing until the last populated threadtask is
* processed. This number is stored in encoder->private_->num_started_threadtasks
*
* If the file is longer, the next due frame chronologically might not be in threadtasks
* number 1, because the threadtasks work like a ringbuffer. To access this, the variable
* twrap starts counting at the next due frame, and the modulo operator (%) is used to
* "wrap" the number with the number of threadtasks. So, if the next due task is 3
* and 4 tasks are started, twrap increases 3, 4, 5, 6, and t follows with values 3, 4, 1, 2.
*/
uint32_t start, end, t, twrap;
if(encoder->private_->num_started_threadtasks < encoder->private_->num_threadtasks) {
start = 1;
Expand All @@ -1689,12 +1694,18 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)
for(twrap = start; twrap < end; twrap++) {
FLAC__ASSERT(twrap > 0);
t = (twrap - 1) % (encoder->private_->num_threadtasks - 1) + 1;
sem_wait(&encoder->private_->threadtask[t]->sem_work_done);
/* Lock mutex, if task isn't done yet, wait for condition */
pthread_mutex_lock(&encoder->private_->threadtask[t]->mutex_this_task);
while(!encoder->private_->threadtask[t]->task_done)
pthread_cond_wait(&encoder->private_->threadtask[t]->cond_task_done,&encoder->private_->threadtask[t]->mutex_this_task);

if(!encoder->private_->threadtask[t]->returnvalue)
ok = false;
if(ok && !write_bitbuffer_(encoder, encoder->private_->threadtask[t], encoder->protected_->blocksize, 0))
ok = false;
pthread_mutex_unlock(&encoder->private_->threadtask[t]->mutex_this_task);
}
/* Wait for MD5 calculation to finish */
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
while(encoder->private_->md5_active || encoder->private_->md5_fifo.tail > 0) {
pthread_cond_wait(&encoder->private_->cond_md5_emptied, &encoder->private_->mutex_work_queue);
Expand All @@ -1717,6 +1728,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)

if(encoder->protected_->num_threads > 1) {
#ifdef HAVE_PTHREAD
/* Properly finish all threads */
uint32_t t;
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
for(t = 1; t < encoder->private_->num_created_threads; t++)
Expand Down Expand Up @@ -2765,8 +2777,8 @@ void free_(FLAC__StreamEncoder *encoder)
if(t > 0) {
#ifdef HAVE_PTHREAD
FLAC__bitwriter_delete(encoder->private_->threadtask[t]->frame);
sem_destroy(&encoder->private_->threadtask[t]->sem_work_available);
sem_destroy(&encoder->private_->threadtask[t]->sem_work_done);
pthread_mutex_destroy(&encoder->private_->threadtask[t]->mutex_this_task);
pthread_cond_destroy(&encoder->private_->threadtask[t]->cond_task_done);
free(encoder->private_->threadtask[t]);
encoder->private_->threadtask[t] = 0;
#else
Expand Down Expand Up @@ -3461,6 +3473,38 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
}
else {
#ifdef HAVE_PTHREAD
/* This bit is quite complicated, so here are some pointers:
*
* When this bit of code is reached for the first time, new threads are spawned and
* threadtasks are populated until the total number of threads equals the requested number
* of threads. Next, threadtasks are populated until they there are no more available.
* Next, this main thread checks whether the threadtask that is due chronologically is
* done. If it is, the bitbuffer is written and the threadtask memory reused for the next
* frame. If it is not done, the main thread checks whether there is enough work left in the
* queue. If there is a lot of work left, the main thread starts on some of it too.
* If not a lot of work is left, the main thread goes to sleep until the frame due first is
* finished.
*
* - encoder->private_->next_thread is the number of the next thread to be created or, when
* the required number of threads is created, the next threadtask to be populated,
* or, when all threadtasks have been populated once, the next threadtask that needs
* to finish and thus reused.
* - encoder->private_->next_threadtask is the number of the next threadtask that a thread
* can start work on.
*
* So, in effect, next_thread is (after startup) a pointer considering the chronological
* order, so input/output isn't shuffled. next_threadtask is a pointer to the next task that
* hasn't been picked up by a thread yet. This distinction enables threads to work on frames
* in a non-chronological order
*
* encoder->protected_->num_threads is the max number of threads that can be spawned
* encoder->private_->num_created_threads is the number of threads that has been spawned
* encoder->private_->num_threadtasks keeps track of how many threadtasks are available
* encoder->private_->num_started_threadtasks keeps track of how many threadtasks have been populated
*
* NOTE: thread no. 0 and threadtask no. 0 are reserved for non-threaded operations, so next_thread
* and next_threadtask start at 1
*/
if(encoder->private_->num_created_threads < encoder->protected_->num_threads) {
/* Create a new thread */
pthread_create(&encoder->private_->thread[encoder->private_->next_thread],
Expand All @@ -3471,10 +3515,20 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
/* If the first task in the queue is still running, check whether there is enough work
* left in the queue. If there is, start on some */
if(encoder->protected_->loose_mid_side_stereo) {
sem_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done);
pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
if(!encoder->private_->threadtask[encoder->private_->next_thread]->task_done)
pthread_cond_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->cond_task_done, &encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
}
else {
while(sem_trywait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done)) {
/* First, check whether the mutex for the next due task is locked or free. If it is free (and thus acquired now) and
* the task is done, proceed to the next bit (writing the bitbuffer). If it is either currently locked or not yet
* processed, choose between starting on some work (if there is enough work in the queue) or waiting for the task
* to finish. Either way, release the mutex first, so it doesn't get interlocked with the work queue mutex */
int mutex_result = pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
while(mutex_result || !encoder->private_->threadtask[encoder->private_->next_thread]->task_done) {
if(!mutex_result)
pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);

pthread_mutex_lock(&encoder->private_->mutex_work_queue);
if(encoder->private_->num_available_threadtasks > (encoder->protected_->num_threads - 1)) {
FLAC__StreamEncoderThreadTask * task = NULL;
Expand All @@ -3484,22 +3538,27 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks)
encoder->private_->next_threadtask = 1;
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
pthread_mutex_lock(&task->mutex_this_task);
process_frame_thread_inner_(encoder, task);
mutex_result = pthread_mutex_trylock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
}
else {
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
sem_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_done);
break;
pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
while(!encoder->private_->threadtask[encoder->private_->next_thread]->task_done)
pthread_cond_wait(&encoder->private_->threadtask[encoder->private_->next_thread]->cond_task_done,&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
mutex_result = 0;
}
}
}
/* Wait for thread to finish, then write bitbuffer */
/* Task is finished, write bitbuffer */
if(!encoder->private_->threadtask[encoder->private_->next_thread]->returnvalue)
return false;
if(!write_bitbuffer_(encoder, encoder->private_->threadtask[encoder->private_->next_thread], encoder->protected_->blocksize, is_last_block)) {
/* the above function sets the state for us in case of an error */
return false;
}
pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
}
/* Copy input data for MD5 calculation */
if(encoder->protected_->do_md5) {
Expand All @@ -3519,14 +3578,18 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
}

/* Copy input data for frame creation */
pthread_mutex_lock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);
for(i = 0; i < encoder->protected_->channels; i++)
memcpy(encoder->private_->threadtask[encoder->private_->next_thread]->integer_signal[i], encoder->private_->threadtask[0]->integer_signal[i], encoder->protected_->blocksize * sizeof(encoder->private_->threadtask[0]->integer_signal[i][0]));

encoder->private_->threadtask[encoder->private_->next_thread]->current_frame_number = encoder->private_->current_frame_number;
pthread_mutex_unlock(&encoder->private_->threadtask[encoder->private_->next_thread]->mutex_this_task);

pthread_mutex_lock(&encoder->private_->mutex_work_queue);
if(encoder->private_->num_started_threadtasks < encoder->private_->num_threadtasks)
encoder->private_->num_started_threadtasks++;
encoder->private_->num_available_threadtasks++;
encoder->private_->threadtask[encoder->private_->next_thread]->task_done = false;
pthread_cond_signal(&encoder->private_->cond_work_available);
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);

Expand Down Expand Up @@ -3563,6 +3626,10 @@ void * process_frame_thread_(void * args) {
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
return NULL;
}
/* The code below pauses and restarts threads if it is noticed threads are often put too sleep
* because of a lack of work. This reduces overhead when too many threads are active. The
* overcommited indicator is increased when no tasks are available, decreased when more tasks
* are available then threads are running, and reset when a thread is woken up or put to sleep */
if(encoder->private_->num_available_threadtasks == 0)
encoder->private_->overcommitted_indicator++;
else if(encoder->private_->num_available_threadtasks > encoder->private_->num_running_threads)
Expand Down Expand Up @@ -3613,6 +3680,7 @@ void * process_frame_thread_(void * args) {
if(encoder->private_->next_threadtask == encoder->private_->num_threadtasks)
encoder->private_->next_threadtask = 1;
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
pthread_mutex_lock(&task->mutex_this_task);
if(!process_frame_thread_inner_(encoder, task))
return NULL;
}
Expand Down Expand Up @@ -3657,7 +3725,9 @@ FLAC__bool process_frame_thread_inner_(FLAC__StreamEncoder * encoder, FLAC__Stre
ok = false;
}
task->returnvalue = ok;
sem_post(&task->sem_work_done);
task->task_done = true;
pthread_cond_signal(&task->cond_task_done);
pthread_mutex_unlock(&task->mutex_this_task);
return true;
}
#endif
Expand Down
Loading