diff --git a/README.markdown b/README.markdown index e374371f0e5e..ffde13756188 100644 --- a/README.markdown +++ b/README.markdown @@ -12,7 +12,8 @@ also trigger actions (such as rebuilding assets) when matching files change. Watchman is known to compile and pass its test suite on: * Linux systems with `inotify` - * OS X and BSDish systems (FreeBSD 9.1, OpenBSD 5.2) that have the + * OS X (uses FSEvents) + * BSDish systems (FreeBSD 9.1, OpenBSD 5.2) that have the `kqueue(2)` facility * Illumos and Solaris style systems that have `port_create(3C)` @@ -1093,9 +1094,8 @@ haven't actually changed. ### Max OS File Descriptor Limits The default per-process descriptor limit on current versions of OS X is -extremely low (256!). Since kqueue() requires an open descriptor for each -watched directory, you will very quickly run into resource limits if your trees -are large or if you do not raise the limits in your system configuration. +extremely low (256!). More recent versions of watchman (2.9.2 and later) +use FSEvents and are not so sensitive to descriptor limits. Watchman will attempt to raise its descriptor limit to match `kern.maxfilesperproc` when it starts up, so you shouldn't need to mess diff --git a/configure.ac b/configure.ac index 64f057b22bc8..edf2a62c4077 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT([watchman], [2.9.1], [], [watchman]) +AC_INIT([watchman], [2.9.2], [], [watchman]) AC_CANONICAL_TARGET AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects]) @@ -167,6 +167,9 @@ AC_CHECK_HEADERS(sys/types.h inttypes.h locale.h port.h sys/inotify.h sys/event. AC_CHECK_FUNCS(kqueue port_create inotify_init strtoll localeconv) AC_CHECK_HEADERS(valgrind/valgrind.h) AC_CHECK_HEADERS(execinfo.h) +AC_CHECK_HEADERS(CoreServices/CoreServices.h, [ + LIBS="$LIBS -framework CoreServices" +]) AC_CHECK_FUNCS(backtrace backtrace_symbols) # Do this after we've looked for functions diff --git a/listener.c b/listener.c index 6a85f2199112..ef8d8345895a 100644 --- a/listener.c +++ b/listener.c @@ -659,7 +659,7 @@ static void wakeme(int signo) unused_parameter(signo); } -#ifdef HAVE_KQUEUE +#if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS) #ifdef __OpenBSD__ #include #endif @@ -691,7 +691,7 @@ bool w_start_listener(const char *path) hb = gimli_heartbeat_attach(); #endif -#ifdef HAVE_KQUEUE +#if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS) { struct rlimit limit; int mib[2] = { CTL_KERN, @@ -730,12 +730,14 @@ bool w_start_listener(const char *path) } getrlimit(RLIMIT_NOFILE, &limit); +#ifndef HAVE_FSEVENTS if (limit.rlim_cur < 10240) { w_log(W_LOG_ERR, "Your file descriptor limit is very low (%" PRIu64 "), " "please consult the watchman docs on raising the limits\n", limit.rlim_cur); } +#endif } #endif diff --git a/root.c b/root.c index 169366d82658..f0a6d36d6149 100644 --- a/root.c +++ b/root.c @@ -23,8 +23,10 @@ static uint32_t next_root_number = 1; * will have its wd set to -1. This means that if another directory with the * same wd shows up, it's OK to replace this with that. */ static struct watchman_dir dir_pending_ignored; +#define HAVE_PER_FILE_NOTIFICATIONS 1 #endif + /* small for testing, but should make this greater than the number of dirs we * have in our repos to avoid realloc */ #define HINT_NUM_DIRS 128*1024 @@ -45,6 +47,21 @@ static void crawler(w_root_t *root, w_string_t *dir_name, static void w_root_teardown(w_root_t *root); +#if HAVE_FSEVENTS +static void *fsevents_thread(void *arg); +static void start_fsevents_thread(w_root_t *root) +{ + if (root->fse_started) { + return; + } + root->fse_started = true; + // Spin up the fsevents processing thread; it owns a ref on the root + w_root_addref(root); + pthread_create(&root->fse_thread, NULL, fsevents_thread, root); +} +#define HAVE_PER_FILE_NOTIFICATIONS 1 +#endif + static void free_pending(struct watchman_pending_fs *p) { w_string_delref(p->path); @@ -138,6 +155,18 @@ static bool w_root_init(w_root_t *root, char **errmsg) w_set_cloexec(root->infd); root->wd_to_dir = w_ht_new(HINT_NUM_DIRS, NULL); #endif +#if HAVE_FSEVENTS + if (pipe(root->fse_pipe)) { + ignore_result(asprintf(errmsg, "watch(%.*s): pipe error: %s", + root->root_path->len, root->root_path->buf, strerror(errno))); + w_log(W_LOG_ERR, "%s\n", *errmsg); + return false; + } + w_set_cloexec(root->fse_pipe[0]); + w_set_cloexec(root->fse_pipe[1]); + pthread_mutex_init(&root->fse_mtx, NULL); + pthread_cond_init(&root->fse_cond, NULL); +#endif #if HAVE_KQUEUE root->kq_fd = kqueue(); if (root->kq_fd == -1) { @@ -307,10 +336,10 @@ bool w_root_sync_to_now(w_root_t *root, int timeoutms) /* generate a cookie name: cookie prefix + id */ w_root_lock(root); tick = root->ticks++; - path_str = w_string_make_printf("%.*s%" PRIu32, + path_str = w_string_make_printf("%.*s%" PRIu32 "-%" PRIu32, root->query_cookie_prefix->len, root->query_cookie_prefix->buf, - tick); + root->number, tick); /* insert our cookie in the map */ w_ht_set(root->query_cookies, w_ht_ptr_val(path_str), w_ht_ptr_val(&cookie)); @@ -818,10 +847,11 @@ static void stat_path(w_root_t *root, } if (!file->exists || via_notify || did_file_change(&file->st, &st)) { w_log(W_LOG_DBG, - "file changed exists=%d via_notify=%d stat-changed=%d %s\n", + "file changed exists=%d via_notify=%d stat-changed=%d isdir=%d %s\n", (int)file->exists, (int)via_notify, (int)(file->exists && !via_notify), + S_ISDIR(st.st_mode), path ); file->exists = true; @@ -838,12 +868,11 @@ static void stat_path(w_root_t *root, // but do if we're looking at the cookie dir (stat_path is never // called for the root itself) w_string_equal(full_path, root->query_cookie_dir)) { -#ifndef HAVE_INOTIFY_INIT - /* On non-Linux systems, we always need to crawl, but may not - * need to be fully recursive */ +#ifndef HAVE_PER_FILE_NOTIFICATIONS + /* we always need to crawl, but may not need to be fully recursive */ crawler(root, full_path, now, recursive); #else - /* On Linux systems we get told about change on the child, so we only + /* we get told about changes on the child, so we only * need to crawl if we've never seen the dir before */ if (recursive) { @@ -851,8 +880,12 @@ static void stat_path(w_root_t *root, } #endif } + } else if (dir_ent) { + // We transitioned from dir to file (see fishy.php), so we should prune + // our former tree here + w_root_mark_deleted(root, dir_ent, now, true); } -#ifdef HAVE_INOTIFY_INIT +#ifdef HAVE_PER_FILE_NOTIFICATIONS if (!S_ISDIR(st.st_mode) && !w_string_equal(dir_name, root->root_path)) { /* Make sure we update the mtime on the parent directory. */ stat_path(root, dir_name, now, false, via_notify); @@ -887,7 +920,7 @@ void w_root_process_path(w_root_t *root, w_string_t *full_path, // XXX Only Linux tells us about filenames, so via_notify will only work // there. Need to figure out a different solution for other platforms. bool consider_cookie = -#if HAVE_INOTIFY_INIT +#ifdef HAVE_PER_FILE_NOTIFICATIONS via_notify || !root->done_initial; #else true; @@ -1047,7 +1080,7 @@ static void crawler(w_root_t *root, w_string_t *dir_name, return; } -#if !HAVE_INOTIFY_INIT +#if !HAVE_PER_FILE_NOTIFICATIONS if (dir->wd == -1) { int newwd; w_log(W_LOG_DBG, "watch_dir(%s)\n", path); @@ -1125,7 +1158,7 @@ static void crawler(w_root_t *root, w_string_t *dir_name, } #endif // HAVE_PORT_CREATE } -#endif // !HAVE_INOTIFY_INIT +#endif // !HAVE_PER_FILE_NOTIFICATIONS /* flag for delete detection */ if (w_ht_first(dir->files, &i)) do { @@ -1556,6 +1589,9 @@ static bool handle_should_recrawl(w_root_t *root) // this should cause us to exit from the notify loop w_root_cancel(root); } +#if HAVE_FSEVENTS + start_fsevents_thread(root); +#endif return true; } return false; @@ -1563,6 +1599,26 @@ static bool handle_should_recrawl(w_root_t *root) static bool wait_for_notify(w_root_t *root, int timeoutms) { +#if HAVE_FSEVENTS + struct timeval now, delta, target; + struct timespec ts; + + if (timeoutms == 0 || root->fse_head) { + return root->fse_head ? true : false; + } + + // Add timeout to current time, convert to absolute timespec + gettimeofday(&now, NULL); + delta.tv_sec = timeoutms / 1000; + delta.tv_usec = (timeoutms - (delta.tv_sec * 1000)) * 1000; + w_timeval_add(now, delta, &target); + w_timeval_to_timespec(target, &ts); + + pthread_mutex_lock(&root->fse_mtx); + pthread_cond_timedwait(&root->fse_cond, &root->fse_mtx, &ts); + pthread_mutex_unlock(&root->fse_mtx); + return root->fse_head ? true : false; +#else int n; struct pollfd pfd; @@ -1580,8 +1636,264 @@ static bool wait_for_notify(w_root_t *root, int timeoutms) n = poll(&pfd, 1, timeoutms); return n == 1; +#endif +} + +#if HAVE_FSEVENTS + +// The ignore logic is to stop recursion of grandchildren or later +// generations than an ignored dir. We allow the direct children +// of an ignored dir, but no further down. +static bool is_ignored(w_root_t *root, const char *path, uint32_t pathlen) +{ + w_ht_iter_t i; + + if (w_ht_first(root->ignore_dirs, &i)) do { + w_string_t *ign = w_ht_val_ptr(i.value); + + if (pathlen <= ign->len) { + continue; + } + + if (memcmp(ign->buf, path, ign->len) == 0) { + // prefix matches, but it isn't a parent + if (path[ign->len] != '/') { + continue; + } + + // If we find any '/' in the remainder of the path, then we should + // ignore it. Otherwise we allow it. + path += ign->len + 1; + pathlen -= ign->len + 1; + if (memchr(path, '/', pathlen)) { + return true; + } + } + + } while (w_ht_next(root->ignore_dirs, &i)); + + return false; +} + +static void fse_callback(ConstFSEventStreamRef streamRef, + void *clientCallBackInfo, + size_t numEvents, + void *eventPaths, + const FSEventStreamEventFlags eventFlags[], + const FSEventStreamEventId eventIds[]) +{ + size_t i; + char **paths = eventPaths; + w_root_t *root = clientCallBackInfo; + char pathbuf[WATCHMAN_NAME_MAX]; + struct watchman_fsevent *head = NULL, *tail = NULL, *evt; + + unused_parameter(streamRef); + unused_parameter(eventIds); + unused_parameter(eventFlags); + + for (i = 0; i < numEvents; i++) { + uint32_t len; + + len = strlen(paths[i]); + if (len >= sizeof(pathbuf)-1) { + w_log(W_LOG_DBG, "FSEvents: %s name is too big :-(\n", paths[i]); + w_root_cancel(root); + break; + } + + strcpy(pathbuf, paths[i]); + while (pathbuf[len-1] == '/') { + pathbuf[len-1] = '\0'; + len--; + } + + if (is_ignored(root, paths[i], len)) { + continue; + } + + evt = calloc(1, sizeof(*evt)); + if (!evt) { + w_log(W_LOG_DBG, "FSEvents: OOM!"); + w_root_cancel(root); + break; + } + + evt->path = w_string_new(pathbuf); + evt->flags = eventFlags[i]; + if (tail) { + tail->next = evt; + } else { + head = evt; + } + tail = evt; + + w_log(W_LOG_DBG, "fse_thread: add %s %" PRIx32 "\n", pathbuf, evt->flags); + } + + pthread_mutex_lock(&root->fse_mtx); + if (root->fse_tail) { + root->fse_tail->next = head; + } else { + root->fse_head = head; + } + root->fse_tail = tail; + pthread_mutex_unlock(&root->fse_mtx); + pthread_cond_signal(&root->fse_cond); +} + +static void fse_pipe_callback(CFFileDescriptorRef fdref, + CFOptionFlags callBackTypes, void *info) +{ + w_root_t *root = info; + + unused_parameter(fdref); + unused_parameter(callBackTypes); + + w_log(W_LOG_DBG, "fse_thread[%.*s]: pipe signalled\n", + root->root_path->len, root->root_path->buf); + CFRunLoopStop(CFRunLoopGetCurrent()); +} + +static void *fsevents_thread(void *arg) +{ + w_root_t *root = arg; + FSEventStreamContext ctx; + CFMutableArrayRef parray; + CFStringRef cpath; + FSEventStreamRef fs_stream; + CFFileDescriptorContext fdctx; + CFFileDescriptorRef fdref; + + memset(&ctx, 0, sizeof(ctx)); + ctx.info = root; + + memset(&fdctx, 0, sizeof(fdctx)); + fdctx.info = root; + + fdref = CFFileDescriptorCreate(NULL, root->fse_pipe[0], true, + fse_pipe_callback, &fdctx); + CFFileDescriptorEnableCallBacks(fdref, kCFFileDescriptorReadCallBack); + { + CFRunLoopSourceRef fdsrc; + + fdsrc = CFFileDescriptorCreateRunLoopSource(NULL, fdref, 0); + CFRunLoopAddSource(CFRunLoopGetCurrent(), fdsrc, kCFRunLoopDefaultMode); + CFRelease(fdsrc); + } + + parray = CFArrayCreateMutable(NULL, 0, &kCFTypeArrayCallBacks); + if (!parray) { + w_log(W_LOG_ERR, "watch(%.*s): CFArrayCreateMutable failed\n", + root->root_path->len, root->root_path->buf); + goto done; + } + + cpath = CFStringCreateWithBytes(NULL, (const UInt8*)root->root_path->buf, + root->root_path->len, kCFStringEncodingUTF8, + false); + if (!cpath) { + w_log(W_LOG_ERR, + "watch(%.*s): CFStringCreateWithBytes failed\n", + root->root_path->len, root->root_path->buf); + goto done; + } + + CFArrayAppendValue(parray, cpath); + CFRelease(cpath); + + fs_stream = FSEventStreamCreate(NULL, fse_callback, + &ctx, parray, kFSEventStreamEventIdSinceNow, + 0.0001, + kFSEventStreamCreateFlagNoDefer| + kFSEventStreamCreateFlagWatchRoot| + kFSEventStreamCreateFlagFileEvents); + + if (!fs_stream) { + w_log(W_LOG_ERR, "watch(%.*s): FSEventStreamCreate failed\n", + root->root_path->len, root->root_path->buf); + goto done; + } + + FSEventStreamScheduleWithRunLoop(fs_stream, + CFRunLoopGetCurrent(), kCFRunLoopDefaultMode); + FSEventStreamStart(fs_stream); + + CFRunLoopRun(); + + FSEventStreamStop(fs_stream); + FSEventStreamInvalidate(fs_stream); + FSEventStreamRelease(fs_stream); + CFRelease(fdref); + +done: + w_log(W_LOG_DBG, "fse_thread[%.*s] done\n", + root->root_path->len, root->root_path->buf); + w_root_delref(root); + return NULL; } +static int consume_fsevents(w_root_t *root) +{ + struct watchman_fsevent *head, *evt; + int n = 0; + struct timeval now; + bool recurse; + + pthread_mutex_lock(&root->fse_mtx); + head = root->fse_head; + root->fse_head = NULL; + root->fse_tail = NULL; + pthread_mutex_unlock(&root->fse_mtx); + + gettimeofday(&now, 0); + + while (head) { + evt = head; + head = head->next; + n++; + + if (evt->flags & kFSEventStreamEventFlagUserDropped) { + w_root_schedule_recrawl(root, "kFSEventStreamEventFlagUserDropped"); +break_out: + w_string_delref(evt->path); + free(evt); + break; + } + + if (evt->flags & kFSEventStreamEventFlagKernelDropped) { + w_root_schedule_recrawl(root, "kFSEventStreamEventFlagKernelDropped"); + goto break_out; + } + + if (evt->flags & kFSEventStreamEventFlagUnmount) { + w_log(W_LOG_ERR, "kFSEventStreamEventFlagUnmount %.*s, cancel watch\n", + evt->path->len, evt->path->buf); + w_root_cancel(root); + goto break_out; + } + + if (evt->flags & kFSEventStreamEventFlagRootChanged) { + w_log(W_LOG_ERR, + "kFSEventStreamEventFlagRootChanged %.*s, cancel watch\n", + evt->path->len, evt->path->buf); + w_root_cancel(root); + goto break_out; + } + + recurse = (evt->flags & kFSEventStreamEventFlagMustScanSubDirs) + ? true : false; + + w_root_add_pending(root, evt->path, recurse, now, true); + + w_string_delref(evt->path); + free(evt); + } + + return n; +} +#endif + #if HAVE_KQUEUE static int consume_kqueue(w_root_t *root, int timeoutms) @@ -1876,6 +2188,8 @@ static bool consume_notify(w_root_t *root) { #if HAVE_INOTIFY_INIT return try_read_inotify(root); +#elif HAVE_FSEVENTS + return consume_fsevents(root); #elif HAVE_KQUEUE return consume_kqueue(root, 0); #elif HAVE_PORT_CREATE @@ -1892,6 +2206,10 @@ static bool consume_notify(w_root_t *root) // we have drained the inotify descriptor static void notify_thread(w_root_t *root) { +#if HAVE_FSEVENTS + start_fsevents_thread(root); +#endif + /* now we can settle into the notification stuff */ while (!root->cancelled) { int timeoutms = MAX(root->trigger_settle, 100); @@ -2029,6 +2347,26 @@ static void w_root_teardown(w_root_t *root) close(root->port_fd); root->port_fd = -1; #endif +#ifdef HAVE_FSEVENTS + // wait for fsevents thread to quit + if (!pthread_equal(root->fse_thread, pthread_self())) { + void *ignore; + pthread_join(root->fse_thread, &ignore); + } + + pthread_cond_destroy(&root->fse_cond); + pthread_mutex_destroy(&root->fse_mtx); + close(root->fse_pipe[0]); + close(root->fse_pipe[1]); + + while (root->fse_head) { + struct watchman_fsevent *evt = root->fse_head; + root->fse_head = evt->next; + + w_string_delref(evt->path); + free(evt); + } +#endif if (root->dirname_to_dir) { w_ht_free(root->dirname_to_dir); @@ -2321,6 +2659,9 @@ static void signal_root_threads(w_root_t *root) if (!pthread_equal(root->notify_thread, pthread_self())) { pthread_kill(root->notify_thread, SIGUSR1); } +#if HAVE_FSEVENTS + write(root->fse_pipe[1], "X", 1); +#endif } void w_root_schedule_recrawl(w_root_t *root, const char *why) diff --git a/tests/integration/remove.php b/tests/integration/remove.php index f51c917a11bf..56c8faec5d76 100644 --- a/tests/integration/remove.php +++ b/tests/integration/remove.php @@ -47,6 +47,10 @@ function testRemove() { if (PHP_OS == 'Linux' && getenv('TRAVIS')) { $this->assertSkipped('openvz and inotify unlinks == bad time'); } + if (PHP_OS == 'Darwin') { + $this->assertSkipped( + "fseventsd doesn't signal kFSEventStreamEventFlagRootChanged"); + } $watches = $this->waitForWatchman( array('watch-list'), function ($list) use ($root) { diff --git a/tests/integration/trigger.php b/tests/integration/trigger.php index 02a5f68caaf8..5c88576f1068 100644 --- a/tests/integration/trigger.php +++ b/tests/integration/trigger.php @@ -24,6 +24,15 @@ function validateTriggerOutput($root) { $logdata, "got the right filename in the log"); + $this->waitFor(function () use ($root) { + if (file_exists("$root/trigger.json")) { + return json_decode( + file_get_contents("$root/trigger.json") + ); + } + return false; + }, 5, "created trigger.json"); + // Validate that the json input is properly formatted $lines = 0; foreach (file("$root/trigger.json") as $line) { @@ -83,7 +92,8 @@ function testTrigger() { touch("$root/foo.c"); - $this->assertWaitForLog('/posix_spawnp/', 60); + $this->watchmanCommand('log', 'debug', 'waiting for spawnp ' . __LINE__); + $this->assertWaitForLog('/posix_spawnp/', 5); $this->stopLogging(); @@ -131,7 +141,8 @@ function testTrigger() { // make sure the triggers didn't get deleted $this->assertTriggerList($root, $trig_list); - $this->assertWaitForLog('/posix_spawnp/', 60); + $this->watchmanCommand('log', 'debug', 'waiting for spawnp ' . __LINE__); + $this->assertWaitForLog('/posix_spawnp/', 5); $this->stopLogging(); // and that the right data was seen diff --git a/watchman.h b/watchman.h index 0ec5e23057ed..432905608ca5 100644 --- a/watchman.h +++ b/watchman.h @@ -67,6 +67,13 @@ extern char **environ; #include "jansson.h" +#ifdef HAVE_CORESERVICES_CORESERVICES_H +# include +# define HAVE_FSEVENTS 1 +# undef HAVE_KQUEUE +#endif + + /* sane, reasonably large filename size that we'll use * throughout; POSIX seems to define smallish buffers * that seem risky */ @@ -138,6 +145,14 @@ struct watchman_string { #define WATCHMAN_PORT_EVENTS \ FILE_MODIFIED | FILE_ATTRIB | FILE_NOFOLLOW +#if HAVE_FSEVENTS +struct watchman_fsevent { + struct watchman_fsevent *next; + w_string_t *path; + FSEventStreamEventFlags flags; +}; +#endif + struct watchman_file; struct watchman_dir; struct watchman_root; @@ -265,6 +280,16 @@ struct watchman_root { #endif #if HAVE_PORT_CREATE int port_fd; +#endif +#if HAVE_FSEVENTS + int fse_pipe[2]; + bool fse_started; + + pthread_mutex_t fse_mtx; + pthread_cond_t fse_cond; + pthread_t fse_thread; + + struct watchman_fsevent *fse_head, *fse_tail; #endif /* map of dir name to a dir */ w_ht_t *dirname_to_dir;