Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record how long a process takes to run #11594

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/rust/engine/concrete_time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ impl TimeSpan {
) -> Result<Self, String> {
let start = Self::std_duration_from_timestamp(start);
let end = Self::std_duration_from_timestamp(end);
let time_span = end.checked_sub(start).map(|duration| TimeSpan {
start: start.into(),
duration: duration.into(),
});
time_span.ok_or_else(|| {
format!(
match end.checked_sub(start) {
Some(duration) => Ok(TimeSpan {
start: start.into(),
duration: duration.into(),
}),
None => Err(format!(
"Got negative {} time: {:?} - {:?}",
time_span_description, end, start
)
})
)),
}
}
}

Expand Down
12 changes: 5 additions & 7 deletions src/rust/engine/concrete_time/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,17 @@ fn time_span_from_start_and_duration_in_seconds(
}

#[test]
fn time_span_from_start_and_end_given_positive_duration() {
let span = time_span_from_start_and_duration_in_seconds(42, 10);
fn time_span_from_prost_timestamp() {
let span = time_span_from_start_and_duration_in_seconds(42, 10).unwrap();
assert_eq!(
Ok(TimeSpan {
TimeSpan {
start: Duration::new(42, 0),
duration: Duration::new(10, 0),
}),
},
span
);
}

#[test]
fn time_span_from_start_and_end_given_negative_duration() {
// A negative duration is invalid.
let span = time_span_from_start_and_duration_in_seconds(42, -10);
assert!(span.is_err());
}
22 changes: 12 additions & 10 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,20 @@ impl CommandRunner {
let stdout_digest = result.stdout_digest;
let stderr_digest = result.stderr_digest;

let action_result = remexec::ActionResult {
exit_code: result.exit_code,
output_directories: vec![remexec::OutputDirectory {
path: String::new(),
tree_digest: Some((&result.output_directory).into()),
}],
stdout_digest: Some((&stdout_digest).into()),
stderr_digest: Some((&stderr_digest).into()),
execution_metadata: Some(result.metadata.clone().into()),
..remexec::ActionResult::default()
};
let execute_response = remexec::ExecuteResponse {
cached_result: true,
result: Some(remexec::ActionResult {
exit_code: result.exit_code,
output_directories: vec![remexec::OutputDirectory {
path: String::new(),
tree_digest: Some((&result.output_directory).into()),
}],
stdout_digest: Some((&stdout_digest).into()),
stderr_digest: Some((&stderr_digest).into()),
..remexec::ActionResult::default()
}),
result: Some(action_result),
..remexec::ExecuteResponse::default()
};

Expand Down
67 changes: 65 additions & 2 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
extern crate derivative;

use async_trait::async_trait;
use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
pub use log::Level;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -65,6 +67,7 @@ pub mod named_caches;
extern crate uname;

pub use crate::named_caches::{CacheDest, CacheName, NamedCaches};
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;

#[derive(PartialOrd, Ord, Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -341,13 +344,73 @@ pub struct ProcessMetadata {
///
/// The result of running a process.
///
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Derivative, Clone, Debug, Eq)]
#[derivative(PartialEq, Hash)]
pub struct FallibleProcessResultWithPlatform {
pub stdout_digest: Digest,
pub stderr_digest: Digest,
pub exit_code: i32,
pub platform: Platform,
pub output_directory: hashing::Digest,
pub platform: Platform,
#[derivative(PartialEq = "ignore", Hash = "ignore")]
pub metadata: ProcessResultMetadata,
}

/// Metadata for a ProcessResult corresponding to the REAPI `ExecutedActionMetadata` proto. This
/// conversion is lossy, but the interesting parts are preserved.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ProcessResultMetadata {
/// The time from starting to completion, including preparing the chroot and cleanup.
/// Corresponds to `worker_start_timestamp` and `worker_completed_timestamp` from
/// `ExecutedActionMetadata`.
pub total_elapsed: Option<Duration>,
}

impl ProcessResultMetadata {
pub fn new(total_elapsed: Option<Duration>) -> Self {
ProcessResultMetadata { total_elapsed }
}
}

impl From<ExecutedActionMetadata> for ProcessResultMetadata {
fn from(metadata: ExecutedActionMetadata) -> Self {
let total_elapsed = match (
metadata.worker_start_timestamp,
metadata.worker_completed_timestamp,
) {
(Some(started), Some(completed)) => TimeSpan::from_start_and_end(&started, &completed, "")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string is used for Err, but we then throw that away with .ok() so I left it empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a weird API... ideally it would return a non-allocated error type with a Debug/Display impl, rather than passing in a string to format. But yea, this is fine for now.

We might consider logging at debug for negative times though...

.map(|span| span.duration)
.ok(),
_ => None,
};
Self { total_elapsed }
}
}

impl Into<ExecutedActionMetadata> for ProcessResultMetadata {
fn into(self) -> ExecutedActionMetadata {
let (total_start, total_end) = match self.total_elapsed {
Some(elapsed) => {
// Because we do not have the precise start time, we hardcode to starting at UNIX_EPOCH. We
// only care about accurately preserving the duration.
let start = prost_types::Timestamp {
seconds: 0,
nanos: 0,
};
let end = prost_types::Timestamp {
seconds: elapsed.secs as i64,
nanos: elapsed.nanos as i32,
};
(Some(start), Some(end))
}
None => (None, None),
};
ExecutedActionMetadata {
worker_start_timestamp: total_start,
worker_completed_timestamp: total_end,
..ExecutedActionMetadata::default()
}
}
}

#[derive(Clone)]
Expand Down
9 changes: 9 additions & 0 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
Expand All @@ -32,6 +33,7 @@ use workunit_store::Metric;

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
ProcessResultMetadata,
};

pub const USER_EXECUTABLE_MODE: u32 = 0o100755;
Expand Down Expand Up @@ -400,6 +402,8 @@ pub trait CapturedWorkdir {
workdir_base: &Path,
platform: Platform,
) -> Result<FallibleProcessResultWithPlatform, String> {
let start_time = Instant::now();

// Set up a temporary workdir, which will optionally be preserved.
let (workdir_path, maybe_workdir) = {
let workdir = tempfile::Builder::new()
Expand Down Expand Up @@ -565,6 +569,9 @@ pub trait CapturedWorkdir {
}
}

let elapsed = start_time.elapsed();
let result_metadata = ProcessResultMetadata::new(Some(elapsed.into()));

match child_results_result {
Ok(child_results) => {
let stdout = child_results.stdout;
Expand All @@ -579,6 +586,7 @@ pub trait CapturedWorkdir {
exit_code: child_results.exit_code,
output_directory: output_snapshot.digest,
platform,
metadata: result_metadata,
})
}
Err(msg) if msg == "deadline has elapsed" => {
Expand All @@ -594,6 +602,7 @@ pub trait CapturedWorkdir {
exit_code: -libc::SIGTERM,
output_directory: hashing::EMPTY_DIGEST,
platform,
metadata: result_metadata,
})
}
Err(msg) => Err(msg),
Expand Down
10 changes: 7 additions & 3 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use workunit_store::{

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata,
ProcessCacheScope, ProcessMetadata, ProcessResultMetadata,
};
use grpc_util::headers_to_interceptor_fn;

Expand Down Expand Up @@ -1099,6 +1099,7 @@ pub async fn populate_fallible_execution_result_for_timeout(
exit_code: -libc::SIGTERM,
output_directory: hashing::EMPTY_DIGEST,
platform,
metadata: ProcessResultMetadata::default(),
})
}

Expand All @@ -1115,7 +1116,6 @@ pub fn populate_fallible_execution_result(
platform: Platform,
treat_tree_digest_as_final_directory_hack: bool,
) -> BoxFuture<Result<FallibleProcessResultWithPlatform, String>> {
let exit_code = action_result.exit_code;
future::try_join3(
extract_stdout(&store, action_result),
extract_stderr(&store, action_result),
Expand All @@ -1130,9 +1130,13 @@ pub fn populate_fallible_execution_result(
Ok(FallibleProcessResultWithPlatform {
stdout_digest,
stderr_digest,
exit_code,
exit_code: action_result.exit_code,
output_directory,
platform,
metadata: action_result
.execution_metadata
.clone()
.map_or(ProcessResultMetadata::default(), |metadata| metadata.into()),
})
},
)
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl CommandRunner {
exit_code: result.exit_code,
stdout_digest: Some(result.stdout_digest.into()),
stderr_digest: Some(result.stderr_digest.into()),
execution_metadata: Some(result.metadata.clone().into()),
..ActionResult::default()
};

Expand Down
6 changes: 4 additions & 2 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use workunit_store::WorkunitStore;
use crate::remote::{ensure_action_stored_locally, make_execute_request};
use crate::{
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform,
MultiPlatformProcess, Platform, Process, ProcessMetadata,
MultiPlatformProcess, Platform, Process, ProcessMetadata, ProcessResultMetadata,
};

/// A mock of the local runner used for better hermeticity of the tests.
Expand All @@ -45,6 +45,7 @@ impl MockLocalCommandRunner {
exit_code,
output_directory: EMPTY_DIGEST,
platform: Platform::current().unwrap(),
metadata: ProcessResultMetadata::default(),
}),
call_counter,
delay: Duration::from_millis(delay_ms),
Expand Down Expand Up @@ -560,9 +561,10 @@ async fn make_action_result_basic() {
let process_result = FallibleProcessResultWithPlatform {
stdout_digest: TestData::roland().digest(),
stderr_digest: TestData::robin().digest(),
output_directory: directory_digest,
exit_code: 102,
platform: Platform::Linux,
output_directory: directory_digest,
metadata: ProcessResultMetadata::default(),
};

let (action_result, digests) = runner
Expand Down
65 changes: 58 additions & 7 deletions src/rust/engine/process_execution/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use crate::Process;
// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::Duration;

use crate::{Process, ProcessResultMetadata};
use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
use prost_types::Timestamp;
use remexec::ExecutedActionMetadata;

#[test]
fn process_equality() {
// TODO: Tests like these would be cleaner with the builder pattern for the rust-side Process API.
Expand All @@ -26,14 +33,58 @@ fn process_equality() {
let d = process_generator("One thing".to_string(), None);

// Process should derive a PartialEq and Hash that ignores the description
assert!(a == b);
assert!(hash(&a) == hash(&b));
assert_eq!(a, b);
assert_eq!(hash(&a), hash(&b));

// ..but not other fields.
assert!(a != c);
assert!(hash(&a) != hash(&c));
assert_ne!(a, c);
assert_ne!(hash(&a), hash(&c));

// Absence of timeout is included in hash.
assert!(a != d);
assert!(hash(&a) != hash(&d));
assert_ne!(a, d);
assert_ne!(hash(&a), hash(&d));
}

#[test]
fn process_result_metadata_to_and_from_executed_action_metadata() {
let action_metadata = ExecutedActionMetadata {
worker_start_timestamp: Some(Timestamp {
seconds: 100,
nanos: 20,
}),
worker_completed_timestamp: Some(Timestamp {
seconds: 120,
nanos: 50,
}),
..ExecutedActionMetadata::default()
};

let converted_process_result: ProcessResultMetadata = action_metadata.into();
assert_eq!(
converted_process_result,
ProcessResultMetadata::new(Some(concrete_time::Duration::new(20, 30)))
);

// The conversion from `ExecutedActionMetadata` to `ProcessResultMetadata` is lossy.
let restored_action_metadata: ExecutedActionMetadata = converted_process_result.into();
assert_eq!(
restored_action_metadata,
ExecutedActionMetadata {
worker_start_timestamp: Some(Timestamp {
seconds: 0,
nanos: 0,
}),
worker_completed_timestamp: Some(Timestamp {
seconds: 20,
nanos: 30,
}),
..ExecutedActionMetadata::default()
}
);

// The relevant metadata may be missing from either type.
let action_metadata_missing: ProcessResultMetadata = ExecutedActionMetadata::default().into();
assert_eq!(action_metadata_missing, ProcessResultMetadata::default());
let process_result_missing: ExecutedActionMetadata = ProcessResultMetadata::default().into();
assert_eq!(process_result_missing, ExecutedActionMetadata::default());
}