Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a feature gate to disable the engine fs watcher introduced in #9318 #9416

Merged
merged 3 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ def ti(type_obj):
execution_options.process_execution_use_local_cache,
self.context.utf8_dict(execution_options.remote_execution_headers),
execution_options.process_execution_local_enable_nailgun,
execution_options.experimental_fs_watcher,
)
if scheduler_result.is_throw:
value = self.context.from_value(scheduler_result.throw_handle)
Expand Down
11 changes: 11 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class ExecutionOptions:
remote_execution_extra_platform_properties: Any
remote_execution_headers: Any
process_execution_local_enable_nailgun: bool
experimental_fs_watcher: bool

@classmethod
def from_bootstrap_options(cls, bootstrap_options):
Expand All @@ -124,6 +125,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties,
remote_execution_headers=bootstrap_options.remote_execution_headers,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
experimental_fs_watcher=bootstrap_options.experimental_fs_watcher,
)


Expand All @@ -149,6 +151,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=[],
remote_execution_headers={},
process_execution_local_enable_nailgun=False,
experimental_fs_watcher=False,
)


Expand Down Expand Up @@ -860,6 +863,14 @@ def register_bootstrap_options(cls, register):
help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.",
advanced=True,
)
register(
"--experimental-fs-watcher",
type=bool,
default=False,
advanced=True,
help="Whether to use the engine filesystem watcher which registers the workspace"
" for kernel file change events",
)

@classmethod
def register_options(cls, register):
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> RawResult {
match make_core(
tasks_ptr,
Expand Down Expand Up @@ -236,6 +237,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache,
remote_execution_headers_buf,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
) {
Ok(core) => RawResult {
is_throw: false,
Expand Down Expand Up @@ -278,6 +280,7 @@ fn make_core(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
let root_type_ids = root_type_ids.to_vec();
let ignore_patterns = ignore_patterns_buf
Expand Down Expand Up @@ -386,6 +389,7 @@ fn make_core(
process_execution_use_local_cache,
remote_execution_headers,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl Core {
process_execution_use_local_cache: bool,
remote_execution_headers: BTreeMap<String, String>,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
// Randomize CAS address order to avoid thundering herds from common config.
let mut remote_store_servers = remote_store_servers;
Expand Down Expand Up @@ -251,6 +252,7 @@ impl Core {
executor.clone(),
build_root.clone(),
ignorer.clone(),
experimental_fs_watcher,
)?;

Ok(Core {
Expand Down
187 changes: 96 additions & 91 deletions src/rust/engine/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct InvalidationWatcher {
watcher: Arc<Mutex<RecommendedWatcher>>,
executor: Executor,
liveness: Receiver<()>,
enabled: bool,
}

impl InvalidationWatcher {
Expand All @@ -49,6 +50,7 @@ impl InvalidationWatcher {
executor: Executor,
build_root: PathBuf,
ignorer: Arc<GitignoreStyleExcludes>,
enabled: bool,
) -> Result<InvalidationWatcher, String> {
// Inotify events contain canonical paths to the files being watched.
// If the build_root contains a symlink the paths returned in notify events
Expand All @@ -59,103 +61,106 @@ impl InvalidationWatcher {
let (watch_sender, watch_receiver) = crossbeam_channel::unbounded();
let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50))
.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?;
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}
let wrapped_watcher = Arc::new(Mutex::new(watcher));

let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
if enabled {
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}

thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};

Ok(InvalidationWatcher {
watcher: wrapped_watcher,
watcher: Arc::new(Mutex::new(watcher)),
executor,
liveness: thread_liveness_receiver,
enabled,
})
}

Expand All @@ -164,8 +169,8 @@ impl InvalidationWatcher {
///
pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> {
// Short circuit here if we are on a Darwin platform because we should be watching
// the entire build root recursively already.
if cfg!(target_os = "macos") {
// the entire build root recursively already, or if we are not enabled.
if cfg!(target_os = "macos") || !self.enabled {
Ok(())
} else {
// Using a futurized mutex here because for some reason using a regular mutex
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/watch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn setup_watch(
) -> InvalidationWatcher {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let executor = Executor::new(rt.handle().clone());
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer)
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer, /*enabled*/ true)
.expect("Couldn't create InvalidationWatcher");
rt.block_on(watcher.watch(file_path)).unwrap();
watcher
Expand Down