Skip to content

Commit

Permalink
Get --loop working atop polling. Adjust the meaning of Graph::Node:…
Browse files Browse the repository at this point in the history
…:SessionId, and rename to RunId to avoid overloading with the engine's Session.

[ci skip-jvm-tests]
  • Loading branch information
Stu Hood committed May 2, 2020
1 parent 4fc3033 commit faa4eb4
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 165 deletions.
9 changes: 7 additions & 2 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ ignore_pants_warnings = [
"DEPRECATED: the antlr, jaxb, ragel, and wire codegen backends",
]

# The pants script in this repo consumes these files to run pants
pantsd_invalidation_globs.add = ["src/python/**/*.py"]
# The invalidation globs cover the PYTHONPATH by default, but we additionally add the rust code.
pantsd_invalidation_globs.add = [
"!*_test.py",
# NB: The `target` directory is ignored via `pants_ignore` below.
"src/rust/engine/**/*.rs",
"src/rust/engine/**/*.toml",
]
# Path patterns to ignore for filesystem operations on top of the builtin patterns.
pants_ignore.add = [
# venv directories under build-support.
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def _init_graph_session(
)

v2_ui = options.for_global_scope().get("v2_ui", False)
use_colors = options.for_global_scope().get("colors", True)
zipkin_trace_v2 = options.for_scope("reporting").zipkin_trace_v2
# TODO(#8658) This should_report_workunits flag must be set to True for
# StreamingWorkunitHandler to receive WorkUnits. It should eventually
Expand All @@ -159,7 +160,8 @@ def _init_graph_session(
return graph_scheduler_helper.new_session(
zipkin_trace_v2,
RunTracker.global_instance().run_id,
v2_ui,
v2_ui=v2_ui,
use_colors=use_colors,
should_report_workunits=stream_workunits,
)

Expand Down
31 changes: 27 additions & 4 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def execution_set_timeout(self, execution_request, timeout: float):
def execution_set_poll(self, execution_request, poll: bool):
self._native.lib.execution_set_poll(execution_request, poll)

def execution_set_poll_delay(self, execution_request, poll_delay: float):
poll_delay_in_ms = int(poll_delay * 1000)
self._native.lib.execution_set_poll_delay(execution_request, poll_delay_in_ms)

@property
def visualize_to_dir(self):
return self._visualize_to_dir
Expand Down Expand Up @@ -380,8 +384,6 @@ class SchedulerSession:
Session.
"""

execution_error_type = ExecutionError

def __init__(self, scheduler, session):
self._scheduler = scheduler
self._session = session
Expand All @@ -397,6 +399,15 @@ def poll_workunits(self) -> PolledWorkunits:
def graph_len(self):
return self._scheduler.graph_len()

def new_run_id(self):
"""Assigns a new "run id" to this Session, without creating a new Session.
Usually each Session corresponds to one end user "run", but there are exceptions: notably,
the `--loop` feature uses one Session, but would like to observe new values for uncacheable
nodes in each iteration of its loop.
"""
self._scheduler._native.lib.session_new_run_id(self._session)

def trace(self, execution_request):
"""Yields a stringified 'stacktrace' starting from the scheduler's roots."""
for line in self._scheduler.graph_trace(self._session, execution_request.native):
Expand All @@ -417,6 +428,7 @@ def execution_request(
products: Sequence[Type],
subjects: Sequence[Union[Any, Params]],
poll: bool = False,
poll_delay: Optional[float] = None,
timeout: Optional[float] = None,
) -> ExecutionRequest:
"""Create and return an ExecutionRequest for the given products and subjects.
Expand All @@ -430,6 +442,8 @@ def execution_request(
:param subjects: A list of singleton input parameters or Params instances.
:param poll: True to wait for _all_ of the given roots to
have changed since their last observed values in this SchedulerSession.
:param poll_delay: A delay (in seconds) to wait after observing a change, and before
beginning to compute a new value.
:param timeout: An optional timeout to wait for the request to complete (in seconds). If the
request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
:returns: An ExecutionRequest for the given products and subjects.
Expand All @@ -440,6 +454,8 @@ def execution_request(
self._scheduler.execution_add_root_select(native_execution_request, subject, product)
if timeout:
self._scheduler.execution_set_timeout(native_execution_request, timeout)
if poll_delay:
self._scheduler.execution_set_poll_delay(native_execution_request, poll_delay)
self._scheduler.execution_set_poll(native_execution_request, poll)
return ExecutionRequest(request_specs, native_execution_request)

Expand Down Expand Up @@ -518,11 +534,18 @@ def _trace_on_error(self, unique_exceptions, request):
unique_exceptions,
)

def run_goal_rule(self, product: Type, subject: Union[Any, Params], poll: bool = False) -> int:
def run_goal_rule(
self,
product: Type,
subject: Union[Any, Params],
poll: bool = False,
poll_delay: Optional[float] = None,
) -> int:
"""
:param product: A Goal subtype.
:param subject: subject for the request.
:param poll: See self.execution_request.
:param poll_delay: See self.execution_request.
:returns: An exit_code for the given Goal.
"""
if self._scheduler.visualize_to_dir is not None:
Expand All @@ -534,7 +557,7 @@ def run_goal_rule(self, product: Type, subject: Union[Any, Params], poll: bool =
product,
)

request = self.execution_request([product], [subject], poll=poll)
request = self.execution_request([product], [subject], poll=poll, poll_delay=poll_delay)
returns, throws = self.execute(request)

if throws:
Expand Down
31 changes: 20 additions & 11 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,26 @@ class LegacyGraphScheduler:
goal_map: Any

def new_session(
self, zipkin_trace_v2, build_id, v2_ui=False, should_report_workunits=False
self,
zipkin_trace_v2,
build_id,
v2_ui=False,
use_colors=True,
should_report_workunits=False,
) -> "LegacyGraphSession":
session = self.scheduler.new_session(
zipkin_trace_v2, build_id, v2_ui, should_report_workunits
)
return LegacyGraphSession(session, self.build_file_aliases, self.goal_map)
console = Console(use_colors=use_colors, session=session if v2_ui else None,)
return LegacyGraphSession(session, console, self.build_file_aliases, self.goal_map)


@dataclass(frozen=True)
class LegacyGraphSession:
"""A thin wrapper around a SchedulerSession configured with @rules for a symbol table."""

scheduler_session: SchedulerSession
console: Console
build_file_aliases: Any
goal_map: Any

Expand All @@ -212,6 +219,8 @@ def run_goal_rules(
options: Options,
goals: Iterable[str],
specs: Specs,
poll: bool = False,
poll_delay: Optional[float] = None,
) -> int:
"""Runs @goal_rules sequentially and interactively by requesting their implicit Goal
products.
Expand All @@ -221,12 +230,6 @@ def run_goal_rules(
:returns: An exit code.
"""

global_options = options.for_global_scope()

console = Console(
use_colors=global_options.colors,
session=self.scheduler_session if global_options.get("v2_ui") else None,
)
workspace = Workspace(self.scheduler_session)
interactive_runner = InteractiveRunner(self.scheduler_session)

Expand All @@ -242,13 +245,19 @@ def run_goal_rules(
if not is_implemented:
continue
params = Params(
specs.provided_specs, options_bootstrapper, console, workspace, interactive_runner,
specs.provided_specs,
options_bootstrapper,
self.console,
workspace,
interactive_runner,
)
logger.debug(f"requesting {goal_product} to satisfy execution of `{goal}` goal")
try:
exit_code = self.scheduler_session.run_goal_rule(goal_product, params)
exit_code = self.scheduler_session.run_goal_rule(
goal_product, params, poll=poll, poll_delay=poll_delay
)
finally:
console.flush()
self.console.flush()

if exit_code != PANTS_SUCCEEDED_EXIT_CODE:
return exit_code
Expand Down
34 changes: 22 additions & 12 deletions src/python/pants/init/options_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,21 @@ def compute_pants_ignore(buildroot, global_options):
"""
pants_ignore = list(global_options.pants_ignore)

def add_ignore(absolute_path):
def add(absolute_path, include=False):
# To ensure that the path is ignored regardless of whether it is a symlink or a directory, we
# strip trailing slashes (which would signal that we wanted to ignore only directories).
maybe_rel_path = fast_relpath_optional(absolute_path, buildroot)
if maybe_rel_path:
rel_path = maybe_rel_path.rstrip(os.path.sep)
pants_ignore.append(f"/{rel_path}")
prefix = "!" if include else ""
pants_ignore.append(f"{prefix}/{rel_path}")

add(global_options.pants_workdir)
add(global_options.pants_distdir)
# NB: We punch a hole in the ignore patterns to allow pants to directly watch process
# metadata that is written to disk.
add(global_options.pants_subprocessdir, include=True)

add_ignore(global_options.pants_workdir)
add_ignore(global_options.pants_distdir)
return pants_ignore

@staticmethod
Expand All @@ -126,23 +131,28 @@ def compute_pantsd_invalidation_globs(buildroot, bootstrap_options):
Combines --pythonpath and --pants-config-files files that are in {buildroot} dir with those
invalidation_globs provided by users.
"""
invalidation_globs = []
globs = (
bootstrap_options.pythonpath
invalidation_globs = set()
globs = set(
sys.path
+ bootstrap_options.pythonpath
+ bootstrap_options.pants_config_files
+ bootstrap_options.pantsd_invalidation_globs
)

for glob in globs:
glob_relpath = os.path.relpath(glob, buildroot)
if glob_relpath and (not glob_relpath.startswith("../")):
invalidation_globs.extend([glob_relpath, glob_relpath + "/**"])
if glob.startswith("!"):
invalidation_globs.add(glob)
continue

glob_relpath = fast_relpath_optional(glob, buildroot) if os.path.isabs(glob) else glob
if glob_relpath:
invalidation_globs.update([glob_relpath, glob_relpath + "/**"])
else:
logging.getLogger(__name__).warning(
logger.debug(
f"Changes to {glob}, outside of the buildroot, will not be invalidated."
)

return invalidation_globs
return list(sorted(invalidation_globs))

@classmethod
def create(cls, options_bootstrapper, build_configuration, init_subsystems=True):
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def register_bootstrap_options(cls, register):
type=list,
default=[],
help="Filesystem events matching any of these globs will trigger a daemon restart. "
"The `--pythonpath` and `--pants-config-files` are inherently invalidated.",
"Pants' own code, plugins, and `--pants-config-files` are inherently invalidated.",
)

# Watchman options.
Expand Down
62 changes: 16 additions & 46 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import logging
import threading
from typing import List, Optional, Tuple, cast

from pants.base.exiter import PANTS_SUCCEEDED_EXIT_CODE
from pants.base.specs import Specs
from pants.engine.fs import PathGlobs, Snapshot
from pants.engine.internals.scheduler import ExecutionTimeoutError
from pants.engine.internals.scheduler import ExecutionError, ExecutionTimeoutError
from pants.engine.unions import UnionMembership
from pants.goal.run_tracker import RunTracker
from pants.init.engine_initializer import LegacyGraphScheduler, LegacyGraphSession
Expand Down Expand Up @@ -66,8 +65,6 @@ def __init__(
None,
)

self._loop_condition = LoopCondition()

def _get_snapshot(self, globs: Tuple[str, ...], poll: bool) -> Optional[Snapshot]:
"""Returns a Snapshot of the input globs.
Expand Down Expand Up @@ -157,8 +154,11 @@ def prepare_graph(self, options: Options) -> LegacyGraphSession:
global_options = options.for_global_scope()
build_id = RunTracker.global_instance().run_id
v2_ui = global_options.get("v2_ui", False)
use_colors = global_options.get("colors", True)
zipkin_trace_v2 = options.for_scope("reporting").zipkin_trace_v2
return self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui)
return self._graph_helper.new_session(
zipkin_trace_v2, build_id, v2_ui=v2_ui, use_colors=use_colors
)

def graph_run_v2(
self,
Expand All @@ -177,24 +177,20 @@ def graph_run_v2(
v2 = global_options.v2

if not perform_loop:
return self._body(session, options, options_bootstrapper, specs, v2)
return self._body(session, options, options_bootstrapper, specs, v2, poll=False)

iterations = global_options.loop_max
exit_code = PANTS_SUCCEEDED_EXIT_CODE

while iterations and not self._state.is_terminating:
# NB: We generate a new "run id" per iteration of the loop in order to allow us to
# observe fresh values for Goals. See notes in `scheduler.rs`.
session.scheduler_session.new_run_id()
try:
exit_code = self._body(session, options, options_bootstrapper, specs, v2)
except session.scheduler_session.execution_error_type as e:
exit_code = self._body(session, options, options_bootstrapper, specs, v2, poll=True)
except ExecutionError as e:
self._logger.warning(e)

iterations -= 1
while (
iterations
and not self._state.is_terminating
and not self._loop_condition.wait(timeout=1)
):
continue

return exit_code

Expand All @@ -205,6 +201,7 @@ def _body(
options_bootstrapper: OptionsBootstrapper,
specs: Specs,
v2: bool,
poll: bool,
) -> int:
exit_code = PANTS_SUCCEEDED_EXIT_CODE

Expand All @@ -213,13 +210,16 @@ def _body(
if v2_goals or (ambiguous_goals and v2):
goals = v2_goals + (ambiguous_goals if v2 else tuple())

# N.B. @goal_rules run pre-fork in order to cache the products they request during execution.
# When polling we use a delay (only applied in cases where we have waited for something
# to do) in order to avoid re-running too quickly when changes arrive in clusters.
exit_code = session.run_goal_rules(
options_bootstrapper=options_bootstrapper,
union_membership=self._union_membership,
options=options,
goals=goals,
specs=specs,
poll=poll,
poll_delay=(0.1 if poll else None),
)

return exit_code
Expand All @@ -231,33 +231,3 @@ def run(self):
self._check_invalidation_watcher_liveness()
# NB: This is a long poll that will keep us from looping too quickly here.
self._check_invalidation_globs(poll=True)


class LoopCondition:
"""A wrapped condition variable to handle deciding when loop consumers should re-run.
Any number of threads may wait and/or notify the condition.
"""

def __init__(self):
super().__init__()
self._condition = threading.Condition(threading.Lock())
self._iteration = 0

def notify_all(self):
"""Notifies all threads waiting for the condition."""
with self._condition:
self._iteration += 1
self._condition.notify_all()

def wait(self, timeout):
"""Waits for the condition for at most the given timeout and returns True if the condition
triggered.
Generally called in a loop until the condition triggers.
"""

with self._condition:
previous_iteration = self._iteration
self._condition.wait(timeout)
return previous_iteration != self._iteration
Loading

0 comments on commit faa4eb4

Please sign in to comment.