diff --git a/src/python/pants/bin/local_pants_runner.py b/src/python/pants/bin/local_pants_runner.py index 78f4db7ad37..59f71036777 100644 --- a/src/python/pants/bin/local_pants_runner.py +++ b/src/python/pants/bin/local_pants_runner.py @@ -89,7 +89,7 @@ def parse_options(args, env, options_bootstrapper=None): return options, build_config, options_bootstrapper @staticmethod - def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, options): + def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, global_options): if graph_session: return graph_session @@ -101,9 +101,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, build_config ) - v2_ui = options.for_global_scope().v2_ui - zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2 - return graph_scheduler_helper.new_session(zipkin_trace_v2, v2_ui) + return graph_scheduler_helper.new_session(global_options.v2_ui) @staticmethod def _maybe_init_target_roots(target_roots, graph_session, options, build_root): @@ -158,7 +156,7 @@ def create(cls, exiter, args, env, target_roots=None, daemon_graph_session=None, daemon_graph_session, options_bootstrapper, build_config, - options + global_options ) target_roots = cls._maybe_init_target_roots( diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 611f71b9da6..8f6cf992790 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -747,8 +747,8 @@ def new_execution_request(self): self.lib.execution_request_create(), self.lib.execution_request_destroy) - def new_session(self, scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count): - return self.gc(self.lib.session_create(scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count), self.lib.session_destroy) + def new_session(self, scheduler, should_render_ui, ui_worker_count): + return self.gc(self.lib.session_create(scheduler, should_render_ui, ui_worker_count), self.lib.session_destroy) def new_scheduler(self, tasks, diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 8afc9a0016a..ad8db13a1a3 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -343,11 +343,9 @@ def lease_files_in_graph(self): def garbage_collect_store(self): self._native.lib.garbage_collect_store(self._scheduler) - def new_session(self, zipkin_trace_v2, v2_ui=False): + def new_session(self, v2_ui=False): """Creates a new SchedulerSession for this Scheduler.""" - return SchedulerSession(self, self._native.new_session( - self._scheduler, zipkin_trace_v2, v2_ui, multiprocessing.cpu_count()) - ) + return SchedulerSession(self, self._native.new_session(self._scheduler, v2_ui, multiprocessing.cpu_count())) _PathGlobsAndRootCollection = Collection.of(PathGlobsAndRoot) @@ -435,10 +433,6 @@ def metrics(self): """Returns metrics for this SchedulerSession as a dict of metric name to metric value.""" return self._scheduler._metrics(self._session) - @staticmethod - def engine_workunits(metrics): - return metrics.get("engine_workunits") - def with_fork_context(self, func): return self._scheduler.with_fork_context(func) diff --git a/src/python/pants/goal/context.py b/src/python/pants/goal/context.py index 7da1e41a2be..0396d33bc24 100644 --- a/src/python/pants/goal/context.py +++ b/src/python/pants/goal/context.py @@ -156,11 +156,7 @@ def executing(self): """A contextmanager that sets metrics in the context of a (v1) engine execution.""" self._set_target_root_count_in_runtracker() yield - metrics = self._scheduler.metrics() - self.run_tracker.pantsd_stats.set_scheduler_metrics(metrics) - engine_workunits = self._scheduler.engine_workunits(metrics) - if engine_workunits: - self.run_tracker.report.bulk_record_workunits(engine_workunits) + self.run_tracker.pantsd_stats.set_scheduler_metrics(self._scheduler.metrics()) self._set_affected_target_count_in_runtracker() def _set_target_root_count_in_runtracker(self): diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index c65e8ac2b4c..fa08acfa708 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -154,8 +154,8 @@ class GlobsHandlingTargetAdaptor(base_class): class LegacyGraphScheduler(datatype(['scheduler', 'build_file_aliases', 'goal_map'])): """A thin wrapper around a Scheduler configured with @rules for a symbol table.""" - def new_session(self, zipkin_trace_v2, v2_ui=False): - session = self.scheduler.new_session(zipkin_trace_v2, v2_ui) + def new_session(self, v2_ui=False): + session = self.scheduler.new_session(v2_ui) return LegacyGraphSession(session, self.build_file_aliases, self.goal_map) diff --git a/src/python/pants/pantsd/service/scheduler_service.py b/src/python/pants/pantsd/service/scheduler_service.py index ff02e2ab822..4102d8a9b96 100644 --- a/src/python/pants/pantsd/service/scheduler_service.py +++ b/src/python/pants/pantsd/service/scheduler_service.py @@ -180,8 +180,7 @@ def prefork(self, options, options_bootstrapper): self._logger.debug('graph len was {}, waiting for initial watchman event'.format(graph_len)) self._watchman_is_running.wait() v2_ui = options.for_global_scope().v2_ui - zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2 - session = self._graph_helper.new_session(zipkin_trace_v2, v2_ui) + session = self._graph_helper.new_session(v2_ui) if options.for_global_scope().loop: prefork_fn = self._prefork_loop diff --git a/src/python/pants/reporting/report.py b/src/python/pants/reporting/report.py index b981d444931..520991ac765 100644 --- a/src/python/pants/reporting/report.py +++ b/src/python/pants/reporting/report.py @@ -140,8 +140,3 @@ def _notify(self): if len(s) > 0: for reporter in self._reporters.values(): reporter.handle_output(workunit, label, s) - - def bulk_record_workunits(self, engine_workunits): - with self._lock: - for reporter in self._reporters.values(): - reporter.bulk_record_workunits(engine_workunits) diff --git a/src/python/pants/reporting/reporter.py b/src/python/pants/reporting/reporter.py index ba165084dac..5f135952ca5 100644 --- a/src/python/pants/reporting/reporter.py +++ b/src/python/pants/reporting/reporter.py @@ -47,10 +47,6 @@ def end_workunit(self, workunit): """A workunit has finished.""" pass - def bulk_record_workunits(self, engine_workunits): - """A collection of workunits from v2 engine part""" - pass - def handle_log(self, workunit, level, *msg_elements): """Handle a message logged by pants code. diff --git a/src/python/pants/reporting/reporting.py b/src/python/pants/reporting/reporting.py index 88ca1a5faf5..6d1860ee67c 100644 --- a/src/python/pants/reporting/reporting.py +++ b/src/python/pants/reporting/reporting.py @@ -62,8 +62,6 @@ def register_options(cls, register): 'or not set when running a Pants command.') register('--zipkin-sample-rate', advanced=True, default=100.0, help='Rate at which to sample Zipkin traces. Value 0.0 - 100.0.') - register('--zipkin-trace-v2', advanced=True, type=bool, default=False, - help='If enabled, the zipkin spans are tracked for v2 engine execution progress.') def initialize(self, run_tracker, all_options, start_time=None): """Initialize with the given RunTracker. diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index 557b338c44c..5272dfa5ffd 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -7,7 +7,7 @@ import logging import requests -from py_zipkin import Encoding, storage +from py_zipkin import Encoding from py_zipkin.transport import BaseTransportHandler from py_zipkin.util import generate_random_64bit_string from py_zipkin.zipkin import ZipkinAttrs, create_attrs_for_span, zipkin_span @@ -64,7 +64,6 @@ def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_ self.parent_id = parent_id self.sample_rate = float(sample_rate) self.endpoint = endpoint - self.span_storage = storage.default_span_storage() def start_workunit(self, workunit): """Implementation of Reporter callback.""" @@ -97,24 +96,19 @@ def start_workunit(self, workunit): sample_rate=self.sample_rate, # Value between 0.0 and 100.0 ) self.trace_id = zipkin_attrs.trace_id - # TODO delete this line when parent_id will be passed in v2 engine: - # - with ExecutionRequest when Nodes from v2 engine are called by a workunit; - # - when a v2 engine Node is called by another v2 engine Node. - self.parent_id = zipkin_attrs.span_id + span = zipkin_span( service_name=service_name, span_name=workunit.name, transport_handler=self.handler, encoding=Encoding.V1_THRIFT, - zipkin_attrs=zipkin_attrs, - span_storage=self.span_storage, + zipkin_attrs=zipkin_attrs ) else: span = zipkin_span( service_name=service_name, span_name=workunit.name, - span_storage=self.span_storage, ) self._workunits_to_spans[workunit] = span span.start() @@ -135,28 +129,3 @@ def close(self): endpoint = self.endpoint.replace("/api/v1/spans", "") logger.debug("Zipkin trace may be located at this URL {}/traces/{}".format(endpoint, self.trace_id)) - - def bulk_record_workunits(self, engine_workunits): - """A collection of workunits from v2 engine part""" - for workunit in engine_workunits: - duration = workunit['end_timestamp'] - workunit['start_timestamp'] - - span = zipkin_span( - service_name="pants", - span_name=workunit['name'], - duration=duration, - span_storage=self.span_storage, - ) - span.zipkin_attrs = ZipkinAttrs( - trace_id=self.trace_id, - span_id=workunit['span_id'], - # TODO change it when we properly pass parent_id to the v2 engine Nodes - # TODO Pass parent_id with ExecutionRequest when v2 engine is called by a workunit - # TODO pass parent_id when v2 engine Node is called by another v2 engine Node - parent_span_id=workunit.get("parent_id", self.parent_id), - flags='0', # flags: stores flags header. Currently unused - is_sampled=True, - ) - span.start() - span.start_timestamp = workunit['start_timestamp'] - span.stop() diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index f79055b9b1c..ffac28bcd61 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -16,7 +16,6 @@ use crate::core::{Failure, TypeId}; use crate::handles::maybe_drop_handles; use crate::nodes::{NodeKey, WrappedNode}; use crate::rule_graph::RuleGraph; -use crate::scheduler::Session; use crate::tasks::Tasks; use crate::types::Types; use boxfuture::{BoxFuture, Boxable}; @@ -275,15 +274,13 @@ impl Core { pub struct Context { pub entry_id: EntryId, pub core: Arc, - pub session: Session, } impl Context { - pub fn new(entry_id: EntryId, core: Arc, session: Session) -> Context { + pub fn new(entry_id: EntryId, core: Arc) -> Context { Context { entry_id: entry_id, core: core, - session: session, } } @@ -317,7 +314,6 @@ impl NodeContext for Context { Context { entry_id: entry_id, core: self.core.clone(), - session: self.session.clone(), } } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 079bb0305ea..8f61f6491c5 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -337,33 +337,11 @@ pub extern "C" fn scheduler_metrics( ) -> Handle { with_scheduler(scheduler_ptr, |scheduler| { with_session(session_ptr, |session| { - let mut values = scheduler + let values = scheduler .metrics(session) .into_iter() .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) .collect::>(); - if session.should_record_zipkin_spans() { - let workunits = session - .get_workunits() - .lock() - .iter() - .map(|workunit| { - let workunit_zipkin_trace_info = vec![ - externs::store_utf8("name"), - externs::store_utf8(&workunit.name), - externs::store_utf8("start_timestamp"), - externs::store_f64(workunit.start_timestamp), - externs::store_utf8("end_timestamp"), - externs::store_f64(workunit.end_timestamp), - externs::store_utf8("span_id"), - externs::store_utf8(&workunit.span_id), - ]; - externs::store_dict(&workunit_zipkin_trace_info) - }) - .collect::>(); - values.push(externs::store_utf8("engine_workunits")); - values.push(externs::store_tuple(&workunits)); - }; externs::store_dict(&values).into() }) }) @@ -569,14 +547,12 @@ pub extern "C" fn nodes_destroy(raw_nodes_ptr: *mut RawNodes) { #[no_mangle] pub extern "C" fn session_create( scheduler_ptr: *mut Scheduler, - should_record_zipkin_spans: bool, should_render_ui: bool, ui_worker_count: u64, ) -> *const Session { with_scheduler(scheduler_ptr, |scheduler| { Box::into_raw(Box::new(Session::new( scheduler, - should_record_zipkin_spans, should_render_ui, ui_worker_count as usize, ))) diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 71dfae528ba..b6328147865 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -29,10 +29,7 @@ use fs::{ use hashing; use process_execution::{self, CommandRunner}; -use crate::scheduler::WorkUnit; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; -use rand::thread_rng; -use rand::Rng; pub type NodeFuture = BoxFuture; @@ -1088,17 +1085,6 @@ impl Node for NodeKey { type Error = Failure; fn run(self, context: Context) -> NodeFuture { - let node_name_and_start_timestamp = if context.session.should_record_zipkin_spans() { - let node_name = format!("{}", self); - let start_timestamp_duration = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap(); - let start_timestamp = duration_as_float_secs(&start_timestamp_duration); - Some((node_name, start_timestamp)) - } else { - None - }; - let context2 = context.clone(); match self { NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(), NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(), @@ -1109,22 +1095,6 @@ impl Node for NodeKey { NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(), NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(), } - .inspect(move |_: &NodeResult| { - if let Some((node_name, start_timestamp)) = node_name_and_start_timestamp { - let end_timestamp_duration = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap(); - let end_timestamp = duration_as_float_secs(&end_timestamp_duration); - let workunit = WorkUnit { - name: node_name, - start_timestamp: start_timestamp, - end_timestamp: end_timestamp, - span_id: generate_random_64bit_string(), - }; - context2.session.add_workunit(workunit) - }; - }) - .to_boxed() } fn digest(res: NodeResult) -> Option { @@ -1148,21 +1118,6 @@ impl Node for NodeKey { } } -fn duration_as_float_secs(duration: &Duration) -> f64 { - // Returning value is formed by representing duration as a hole number of seconds (u64) plus - // a hole number of microseconds (u32) turned into a f64 type. - // Reverting time from duration to f64 decrease precision. - let whole_secs_in_duration = duration.as_secs() as f64; - let fract_part_of_duration_in_micros = f64::from(duration.subsec_micros()); - whole_secs_in_duration + fract_part_of_duration_in_micros / 1_000_000.0 -} - -fn generate_random_64bit_string() -> String { - let mut rng = thread_rng(); - let random_u64: u64 = rng.gen(); - format!("{:16.x}", random_u64) -} - impl Display for NodeKey { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index eea2fad1af8..ffbdba198dd 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -27,75 +27,33 @@ use ui::EngineDisplay; /// Both Scheduler and Session are exposed to python and expected to be used by multiple threads, so /// they use internal mutability in order to avoid exposing locks to callers. /// -struct InnerSession { +pub struct Session { // The total size of the graph at Session-creation time. preceding_graph_size: usize, // The set of roots that have been requested within this session. roots: Mutex>, // If enabled, the display that will render the progress of the V2 engine. display: Option>, - // If enabled, Zipkin spans for v2 engine will be collected. - should_record_zipkin_spans: bool, - // A place to store info about workunits in rust part - workunits: Mutex>, -} - -#[derive(Clone)] -pub struct Session(Arc); - -pub struct WorkUnit { - pub name: String, - pub start_timestamp: f64, - pub end_timestamp: f64, - pub span_id: String, } impl Session { - pub fn new( - scheduler: &Scheduler, - should_record_zipkin_spans: bool, - should_render_ui: bool, - ui_worker_count: usize, - ) -> Session { - let inner_session = InnerSession { + pub fn new(scheduler: &Scheduler, should_render_ui: bool, ui_worker_count: usize) -> Session { + Session { preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), - should_record_zipkin_spans: should_record_zipkin_spans, - workunits: Mutex::new(Vec::new()), - }; - Session(Arc::new(inner_session)) + } } fn extend(&self, new_roots: &[Root]) { - let mut roots = self.0.roots.lock(); + let mut roots = self.roots.lock(); roots.extend(new_roots.iter().cloned()); } fn root_nodes(&self) -> Vec { - let roots = self.0.roots.lock(); + let roots = self.roots.lock(); roots.iter().map(|r| r.clone().into()).collect() } - - pub fn preceding_graph_size(&self) -> usize { - self.0.preceding_graph_size - } - - pub fn display(&self) -> &Option> { - &self.0.display - } - - pub fn should_record_zipkin_spans(&self) -> bool { - self.0.should_record_zipkin_spans - } - - pub fn get_workunits(&self) -> &Mutex> { - &self.0.workunits - } - - pub fn add_workunit(&self, workunit: WorkUnit) { - self.0.workunits.lock().push(workunit); - } } pub struct ExecutionRequest { @@ -211,10 +169,7 @@ impl Scheduler { .graph .reachable_digest_count(&session.root_nodes()) as i64, ); - m.insert( - "preceding_graph_size", - session.preceding_graph_size() as i64, - ); + m.insert("preceding_graph_size", session.preceding_graph_size as i64); m.insert("resulting_graph_size", self.core.graph.len() as i64); m } @@ -294,7 +249,6 @@ impl Scheduler { // individual Future in the join was (eventually) mapped into success. let context = RootContext { core: self.core.clone(), - session: session.clone(), }; let (sender, receiver) = mpsc::channel(); @@ -307,7 +261,7 @@ impl Scheduler { .collect(); // Lock the display for the remainder of the execution, and grab a reference to it. - let mut maybe_display = match &session.display() { + let mut maybe_display = match &session.display { &Some(ref d) => Some(d.lock()), &None => None, }; @@ -390,14 +344,13 @@ pub type RootResult = Result; #[derive(Clone)] struct RootContext { core: Arc, - session: Session, } impl NodeContext for RootContext { type Node = NodeKey; fn clone_for(&self, entry_id: EntryId) -> Context { - Context::new(entry_id, self.core.clone(), self.session.clone()) + Context::new(entry_id, self.core.clone()) } fn graph(&self) -> &Graph { diff --git a/tests/python/pants_test/engine/scheduler_test_base.py b/tests/python/pants_test/engine/scheduler_test_base.py index 11aa427380a..675bdec4aaa 100644 --- a/tests/python/pants_test/engine/scheduler_test_base.py +++ b/tests/python/pants_test/engine/scheduler_test_base.py @@ -63,7 +63,7 @@ def mk_scheduler(self, union_rules, DEFAULT_EXECUTION_OPTIONS, include_trace_on_error=include_trace_on_error) - return scheduler.new_session(zipkin_trace_v2=False) + return scheduler.new_session() def context_with_scheduler(self, scheduler, *args, **kwargs): return self.context(*args, scheduler=scheduler, **kwargs) diff --git a/tests/python/pants_test/reporting/test_reporting_integration.py b/tests/python/pants_test/reporting/test_reporting_integration.py index 35fee25c901..c09303aedf5 100644 --- a/tests/python/pants_test/reporting/test_reporting_integration.py +++ b/tests/python/pants_test/reporting/test_reporting_integration.py @@ -237,30 +237,6 @@ def test_zipkin_reporter_with_zero_sample_rate(self): num_of_traces = len(ZipkinHandler.traces) self.assertEqual(num_of_traces, 0) - def test_zipkin_reporter_for_v2_engine(self): - ZipkinHandler = zipkin_handler() - with http_server(ZipkinHandler) as port: - endpoint = "http://localhost:{}".format(port) - command = [ - '--reporting-zipkin-endpoint={}'.format(endpoint), - '--reporting-zipkin-trace-v2', - 'cloc', - 'src/python/pants:version' - ] - - pants_run = self.run_pants(command) - self.assert_success(pants_run) - - num_of_traces = len(ZipkinHandler.traces) - self.assertEqual(num_of_traces, 1) - - trace = ZipkinHandler.traces[-1] - v2_span_name_part = "Scandir" - self.assertTrue(any(v2_span_name_part in span['name'] for span in trace), - "There is no span that contains '{}' in it's name. The trace:{}".format( - v2_span_name_part, trace - )) - @staticmethod def find_spans_by_name(trace, name): return [span for span in trace if span['name'] == name] diff --git a/tests/python/pants_test/test_base.py b/tests/python/pants_test/test_base.py index cbba5111392..add9fe3f4ac 100644 --- a/tests/python/pants_test/test_base.py +++ b/tests/python/pants_test/test_base.py @@ -400,7 +400,7 @@ def _init_engine(cls): options_bootstrapper=OptionsBootstrapper.create(args=['--pants-config-files=[]']), build_configuration=cls.build_config(), build_ignore_patterns=None, - ).new_session(zipkin_trace_v2=False) + ).new_session() cls._scheduler = graph_session.scheduler_session cls._build_graph, cls._address_mapper = graph_session.create_build_graph( TargetRoots([]), cls._build_root()