Skip to content

Commit b3f1e0e

Browse files
thinkharderdevfsdvh
authored andcommitted
Record when job is queued in scheduler metrics (#28)
* Record when job is queueud in scheduler metrics * add additional buckets for exec times
1 parent 3e7ea57 commit b3f1e0e

File tree

4 files changed

+48
-4
lines changed

4 files changed

+48
-4
lines changed

ballista/scheduler/src/metrics/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ use std::sync::Arc;
2626
/// will be passed when constructing the `QueryStageScheduler` which is the core event loop of the scheduler.
2727
/// The event loop will then record metric events through this trait.
2828
pub trait SchedulerMetricsCollector: Send + Sync {
29+
/// Record that job with `job_id` was queued. This will be invoked
30+
/// when the job is queued for scheduling.
31+
/// When invoked should specify the timestamp in milliseconds when the job was queued
32+
fn record_queued(&self, job_id: &str, queued_at: u64);
2933
/// Record that job with `job_id` was submitted. This will be invoked
3034
/// after the job's `ExecutionGraph` is created and it is ready to be scheduled
3135
/// on executors.
@@ -62,6 +66,7 @@ pub trait SchedulerMetricsCollector: Send + Sync {
6266
pub struct NoopMetricsCollector {}
6367

6468
impl SchedulerMetricsCollector for NoopMetricsCollector {
69+
fn record_queued(&self, _job_id: &str, _queued_at: u64) {}
6570
fn record_submitted(&self, _job_id: &str, _queued_at: u64, _submitted_at: u64) {}
6671
fn record_completed(&self, _job_id: &str, _queued_at: u64, _completed_att: u64) {}
6772
fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {}

ballista/scheduler/src/metrics/prometheus.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,17 @@ pub struct PrometheusMetricsCollector {
4646
completed: Counter,
4747
submitted: Counter,
4848
pending_queue_size: Gauge,
49+
active_jobs: Gauge,
4950
}
5051

5152
impl PrometheusMetricsCollector {
5253
pub fn new(registry: &Registry) -> Result<Self> {
5354
let execution_time = register_histogram_with_registry!(
5455
"job_exec_time_seconds",
5556
"Histogram of successful job execution time in seconds",
56-
vec![0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64],
57+
vec![
58+
0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64, 120_f64, 180_f64, 240_f64, 300_f64
59+
],
5760
registry
5861
)
5962
.map_err(|e| {
@@ -63,7 +66,10 @@ impl PrometheusMetricsCollector {
6366
let planning_time = register_histogram_with_registry!(
6467
"planning_time_ms",
6568
"Histogram of job planning time in milliseconds",
66-
vec![1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 500.0_f64],
69+
vec![
70+
1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 200.0_f64, 300_f64, 400_f64,
71+
500_f64, 1_000_f64, 2_000_f64, 3_000_f64
72+
],
6773
registry
6874
)
6975
.map_err(|e| {
@@ -115,6 +121,15 @@ impl PrometheusMetricsCollector {
115121
BallistaError::Internal(format!("Error registering metric: {:?}", e))
116122
})?;
117123

124+
let active_jobs = register_gauge_with_registry!(
125+
"active_job_count",
126+
"Number of active jobs on the scheduler",
127+
registry
128+
)
129+
.map_err(|e| {
130+
BallistaError::Internal(format!("Error registering metric: {:?}", e))
131+
})?;
132+
118133
Ok(Self {
119134
execution_time,
120135
planning_time,
@@ -123,6 +138,7 @@ impl PrometheusMetricsCollector {
123138
completed,
124139
submitted,
125140
pending_queue_size,
141+
active_jobs,
126142
})
127143
}
128144

@@ -138,6 +154,10 @@ impl PrometheusMetricsCollector {
138154
}
139155

140156
impl SchedulerMetricsCollector for PrometheusMetricsCollector {
157+
fn record_queued(&self, _job_id: &str, _queued_at: u64) {
158+
self.active_jobs.inc()
159+
}
160+
141161
fn record_submitted(&self, _job_id: &str, queued_at: u64, submitted_at: u64) {
142162
self.submitted.inc();
143163
self.planning_time
@@ -146,15 +166,18 @@ impl SchedulerMetricsCollector for PrometheusMetricsCollector {
146166

147167
fn record_completed(&self, _job_id: &str, queued_at: u64, completed_at: u64) {
148168
self.completed.inc();
169+
self.active_jobs.dec();
149170
self.execution_time
150171
.observe((completed_at - queued_at) as f64 / 1000_f64)
151172
}
152173

153174
fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {
175+
self.active_jobs.dec();
154176
self.failed.inc()
155177
}
156178

157179
fn record_cancelled(&self, _job_id: &str) {
180+
self.active_jobs.dec();
158181
self.cancelled.inc();
159182
}
160183

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+3
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
101101
} => {
102102
info!("Job {} queued with name {:?}", job_id, job_name);
103103

104+
self.metrics_collector
105+
.record_queued(&job_id, timestamp_millis());
106+
104107
let state = self.state.clone();
105108
tokio::spawn(async move {
106109
let event = if let Err(e) = state

ballista/scheduler/src/test_utils.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ impl SchedulerTest {
598598

599599
#[derive(Clone)]
600600
pub enum MetricEvent {
601+
Queued(String, u64),
601602
Submitted(String, u64, u64),
602603
Completed(String, u64, u64),
603604
Cancelled(String),
@@ -607,6 +608,7 @@ pub enum MetricEvent {
607608
impl MetricEvent {
608609
pub fn job_id(&self) -> &str {
609610
match self {
611+
MetricEvent::Queued(job, _) => job.as_str(),
610612
MetricEvent::Submitted(job, _, _) => job.as_str(),
611613
MetricEvent::Completed(job, _, _) => job.as_str(),
612614
MetricEvent::Cancelled(job) => job.as_str(),
@@ -638,6 +640,11 @@ impl TestMetricsCollector {
638640
}
639641

640642
impl SchedulerMetricsCollector for TestMetricsCollector {
643+
fn record_queued(&self, job_id: &str, queued_at: u64) {
644+
let mut guard = self.events.lock();
645+
guard.push(MetricEvent::Queued(job_id.to_owned(), queued_at));
646+
}
647+
641648
fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at: u64) {
642649
let mut guard = self.events.lock();
643650
guard.push(MetricEvent::Submitted(
@@ -674,12 +681,18 @@ impl SchedulerMetricsCollector for TestMetricsCollector {
674681
}
675682

676683
pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
677-
let found = collector
684+
let queued = collector
685+
.job_events(job_id)
686+
.iter()
687+
.any(|ev| matches!(ev, MetricEvent::Queued(_, _)));
688+
689+
let submitted = collector
678690
.job_events(job_id)
679691
.iter()
680692
.any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
681693

682-
assert!(found, "Expected submitted event for job {}", job_id);
694+
assert!(queued, "Expected queued event for job {}", job_id);
695+
assert!(submitted, "Expected submitted event for job {}", job_id);
683696
}
684697

685698
pub fn assert_no_submitted_event(job_id: &str, collector: &TestMetricsCollector) {

0 commit comments

Comments
 (0)