From 037dc51b198a24feb992ff59b16706c8f5ef5acd Mon Sep 17 00:00:00 2001 From: Kiran Pamnany Date: Tue, 24 Sep 2024 12:47:53 -0400 Subject: [PATCH] Adjust heartbeat behavior (#180) * Add heartbeat pause/resume capability * Add check to avoid negative sleep duration * Disable heartbeats in `jl_print_task_backtraces()` `jl_print_task_backtraces()` can take long enough that there can be heartbeat loss, which can trigger printing task backtraces again, unless it is called from the heartbeat thread which takes care of that possible problem. * Pause heartbeats for GC * Address review comment * Address review comment --- src/gc.c | 5 +++++ src/stackwalk.c | 18 ++++++++++++++- src/threading.c | 59 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 77 insertions(+), 5 deletions(-) diff --git a/src/gc.c b/src/gc.c index dad5768732545..ccafc05e02621 100644 --- a/src/gc.c +++ b/src/gc.c @@ -3734,6 +3734,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) return recollect; } +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); + JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) { JL_PROBE_GC_BEGIN(collection); @@ -3775,6 +3778,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) // existence of the thread in the jl_n_threads count. // // TODO: concurrently queue objects + jl_heartbeat_pause(); jl_fence(); gc_n_threads = jl_atomic_load_acquire(&jl_n_threads); gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states); @@ -3806,6 +3810,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) gc_n_threads = 0; gc_all_tls_states = NULL; + jl_heartbeat_resume(); jl_safepoint_end_gc(); jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); JL_PROBE_GC_END(); diff --git a/src/stackwalk.c b/src/stackwalk.c index 8a12fb2a28143..ab3f13c8e9746 100644 --- a/src/stackwalk.c +++ b/src/stackwalk.c @@ -1166,10 +1166,22 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT } extern int gc_first_tid; +extern int jl_inside_heartbeat_thread(void); +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); -// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr +// Print backtraces for all live tasks, for all threads, to jl_safe_printf +// stderr. This can take a _long_ time! JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT { + // disable heartbeats to prevent heartbeat loss while running this, + // unless this is called from the heartbeat thread itself; in that + // situation, the thread is busy running this and it will not be + // updating the missed heartbeats counter + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_pause(); + } + size_t nthreads = jl_atomic_load_acquire(&jl_n_threads); jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states); int ctid = -1; @@ -1232,6 +1244,10 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1); } jl_safe_printf("thread (%d) ++++ Done\n", ctid); + + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_resume(); + } } #ifdef __cplusplus diff --git a/src/threading.c b/src/threading.c index 8f350d41f64b1..d89e8d09745fc 100644 --- a/src/threading.c +++ b/src/threading.c @@ -1008,6 +1008,45 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, return 0; } +// temporarily pause the heartbeat thread +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + if (!heartbeat_enabled) { + return -1; + } + heartbeat_enabled = 0; + return 0; +} + +// resume the paused heartbeat thread +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + // cannot resume if the heartbeat thread is already running + if (heartbeat_enabled) { + return -1; + } + + // cannot resume if we weren't paused (disabled != paused) + if (heartbeat_interval_s == 0) { + return -1; + } + + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + // reset state as we've been paused + n_hbs_missed = 0; + n_hbs_recvd = 0; + tasks_showed = 0; + + // resume + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + return 0; +} + // heartbeat JL_DLLEXPORT void jl_heartbeat(void) { @@ -1099,7 +1138,7 @@ void jl_heartbeat_threadfun(void *arg) uv_sem_post(&heartbeat_off_sem); // sleep the thread here; this semaphore is posted in - // jl_heartbeat_enable() + // jl_heartbeat_enable() or jl_heartbeat_resume() uv_sem_wait(&heartbeat_on_sem); // Set the sleep duration. @@ -1111,7 +1150,7 @@ void jl_heartbeat_threadfun(void *arg) // heartbeat is enabled; sleep, waiting for the desired interval sleep_for(s, ns); - // if heartbeats were turned off while we were sleeping, reset + // if heartbeats were turned off/paused while we were sleeping, reset if (!heartbeat_enabled) { continue; } @@ -1122,13 +1161,15 @@ void jl_heartbeat_threadfun(void *arg) tchb = jl_hrtime() - t0; // adjust the next sleep duration based on how long the heartbeat - // check took + // check took, but if it took too long then use the normal duration rs = 1; while (tchb > 1e9) { rs++; tchb -= 1e9; } - s = heartbeat_interval_s - rs; + if (rs < heartbeat_interval_s) { + s = heartbeat_interval_s - rs; + } ns = 1e9 - tchb; } } @@ -1150,6 +1191,16 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, return -1; } +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + return -1; +} + +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + return -1; +} + JL_DLLEXPORT void jl_heartbeat(void) { }