Skip to content

Commit 13572f3

Browse files
thinkharderdevfsdvh
authored andcommitted
Record when job is queued in scheduler metrics (#28)
* Record when job is queueud in scheduler metrics * add additional buckets for exec times
1 parent 9cb8ed1 commit 13572f3

File tree

4 files changed

+48
-4
lines changed

4 files changed

+48
-4
lines changed

ballista/scheduler/src/metrics/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ use std::sync::Arc;
2626
/// will be passed when constructing the `QueryStageScheduler` which is the core event loop of the scheduler.
2727
/// The event loop will then record metric events through this trait.
2828
pub trait SchedulerMetricsCollector: Send + Sync {
29+
/// Record that job with `job_id` was queued. This will be invoked
30+
/// when the job is queued for scheduling.
31+
/// When invoked should specify the timestamp in milliseconds when the job was queued
32+
fn record_queued(&self, job_id: &str, queued_at: u64);
2933
/// Record that job with `job_id` was submitted. This will be invoked
3034
/// after the job's `ExecutionGraph` is created and it is ready to be scheduled
3135
/// on executors.
@@ -62,6 +66,7 @@ pub trait SchedulerMetricsCollector: Send + Sync {
6266
pub struct NoopMetricsCollector {}
6367

6468
impl SchedulerMetricsCollector for NoopMetricsCollector {
69+
fn record_queued(&self, _job_id: &str, _queued_at: u64) {}
6570
fn record_submitted(&self, _job_id: &str, _queued_at: u64, _submitted_at: u64) {}
6671
fn record_completed(&self, _job_id: &str, _queued_at: u64, _completed_att: u64) {}
6772
fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {}

ballista/scheduler/src/metrics/prometheus.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,17 @@ pub struct PrometheusMetricsCollector {
4646
completed: Counter,
4747
submitted: Counter,
4848
pending_queue_size: Gauge,
49+
active_jobs: Gauge,
4950
}
5051

5152
impl PrometheusMetricsCollector {
5253
pub fn new(registry: &Registry) -> Result<Self> {
5354
let execution_time = register_histogram_with_registry!(
5455
"job_exec_time_seconds",
5556
"Histogram of successful job execution time in seconds",
56-
vec![0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64],
57+
vec![
58+
0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64, 120_f64, 180_f64, 240_f64, 300_f64
59+
],
5760
registry
5861
)
5962
.map_err(|e| {
@@ -63,7 +66,10 @@ impl PrometheusMetricsCollector {
6366
let planning_time = register_histogram_with_registry!(
6467
"planning_time_ms",
6568
"Histogram of job planning time in milliseconds",
66-
vec![1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 500.0_f64],
69+
vec![
70+
1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 200.0_f64, 300_f64, 400_f64,
71+
500_f64, 1_000_f64, 2_000_f64, 3_000_f64
72+
],
6773
registry
6874
)
6975
.map_err(|e| {
@@ -115,6 +121,15 @@ impl PrometheusMetricsCollector {
115121
BallistaError::Internal(format!("Error registering metric: {e:?}"))
116122
})?;
117123

124+
let active_jobs = register_gauge_with_registry!(
125+
"active_job_count",
126+
"Number of active jobs on the scheduler",
127+
registry
128+
)
129+
.map_err(|e| {
130+
BallistaError::Internal(format!("Error registering metric: {:?}", e))
131+
})?;
132+
118133
Ok(Self {
119134
execution_time,
120135
planning_time,
@@ -123,6 +138,7 @@ impl PrometheusMetricsCollector {
123138
completed,
124139
submitted,
125140
pending_queue_size,
141+
active_jobs,
126142
})
127143
}
128144

@@ -138,6 +154,10 @@ impl PrometheusMetricsCollector {
138154
}
139155

140156
impl SchedulerMetricsCollector for PrometheusMetricsCollector {
157+
fn record_queued(&self, _job_id: &str, _queued_at: u64) {
158+
self.active_jobs.inc()
159+
}
160+
141161
fn record_submitted(&self, _job_id: &str, queued_at: u64, submitted_at: u64) {
142162
self.submitted.inc();
143163
self.planning_time
@@ -146,15 +166,18 @@ impl SchedulerMetricsCollector for PrometheusMetricsCollector {
146166

147167
fn record_completed(&self, _job_id: &str, queued_at: u64, completed_at: u64) {
148168
self.completed.inc();
169+
self.active_jobs.dec();
149170
self.execution_time
150171
.observe((completed_at - queued_at) as f64 / 1000_f64)
151172
}
152173

153174
fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {
175+
self.active_jobs.dec();
154176
self.failed.inc()
155177
}
156178

157179
fn record_cancelled(&self, _job_id: &str) {
180+
self.active_jobs.dec();
158181
self.cancelled.inc();
159182
}
160183

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
104104
} => {
105105
info!("Job {} queued with name {:?}", job_id, job_name);
106106

107+
self.metrics_collector
108+
.record_queued(&job_id, timestamp_millis());
109+
107110
self.state
108111
.task_manager
109112
.queue_job(&job_id, &job_name, queued_at)

ballista/scheduler/src/test_utils.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ impl SchedulerTest {
673673

674674
#[derive(Clone)]
675675
pub enum MetricEvent {
676+
Queued(String, u64),
676677
Submitted(String, u64, u64),
677678
Completed(String, u64, u64),
678679
Cancelled(String),
@@ -682,6 +683,7 @@ pub enum MetricEvent {
682683
impl MetricEvent {
683684
pub fn job_id(&self) -> &str {
684685
match self {
686+
MetricEvent::Queued(job, _) => job.as_str(),
685687
MetricEvent::Submitted(job, _, _) => job.as_str(),
686688
MetricEvent::Completed(job, _, _) => job.as_str(),
687689
MetricEvent::Cancelled(job) => job.as_str(),
@@ -713,6 +715,11 @@ impl TestMetricsCollector {
713715
}
714716

715717
impl SchedulerMetricsCollector for TestMetricsCollector {
718+
fn record_queued(&self, job_id: &str, queued_at: u64) {
719+
let mut guard = self.events.lock();
720+
guard.push(MetricEvent::Queued(job_id.to_owned(), queued_at));
721+
}
722+
716723
fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at: u64) {
717724
let mut guard = self.events.lock();
718725
guard.push(MetricEvent::Submitted(
@@ -749,12 +756,18 @@ impl SchedulerMetricsCollector for TestMetricsCollector {
749756
}
750757

751758
pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
752-
let found = collector
759+
let queued = collector
760+
.job_events(job_id)
761+
.iter()
762+
.any(|ev| matches!(ev, MetricEvent::Queued(_, _)));
763+
764+
let submitted = collector
753765
.job_events(job_id)
754766
.iter()
755767
.any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
756768

757-
assert!(found, "{}", "Expected submitted event for job {job_id}");
769+
assert!(queued, "Expected queued event for job {}", job_id);
770+
assert!(submitted, "Expected submitted event for job {}", job_id);
758771
}
759772

760773
pub fn assert_no_submitted_event(job_id: &str, collector: &TestMetricsCollector) {

0 commit comments

Comments
 (0)