addons/isl-server/src/watchman.tsblame
View source
b69ab311/**
b69ab312 * Copyright (c) Meta Platforms, Inc. and affiliates.
b69ab313 *
b69ab314 * This source code is licensed under the MIT license found in the
b69ab315 * LICENSE file in the root directory of this source tree.
b69ab316 */
b69ab317
b69ab318import type {Logger} from './logger';
b69ab319
b69ab3110import watchman from 'fb-watchman';
b69ab3111import {EventEmitter} from 'node:events';
b69ab3112import path from 'node:path';
b69ab3113import {type ServerSideTracker} from './analytics/serverSideTracker';
b69ab3114import {firstOfIterable, serializeAsyncCall, sleep} from './utils';
b69ab3115
b69ab3116export type WatchmanSubscriptionOptions = {
b69ab3117 fields?: Array<string>;
b69ab3118 expression?: Array<unknown>;
b69ab3119 since?: string;
b69ab3120 defer?: Array<string>;
b69ab3121 defer_vcs?: boolean;
b69ab3122 relative_root?: string;
b69ab3123 empty_on_fresh_instance?: boolean;
b69ab3124};
b69ab3125
b69ab3126export type WatchmanSubscription = {
b69ab3127 root: string;
b69ab3128 /**
b69ab3129 * The relative path from subscriptionRoot to subscriptionPath.
b69ab3130 * This is the 'relative_path' as described at
b69ab3131 * https://facebook.github.io/watchman/docs/cmd/watch-project.html#using-watch-project.
b69ab3132 * Notably, this value should be undefined if subscriptionRoot is the same as
b69ab3133 * subscriptionPath.
b69ab3134 */
b69ab3135 pathFromSubscriptionRootToSubscriptionPath: string | undefined;
b69ab3136 path: string;
b69ab3137 name: string;
b69ab3138 subscriptionCount: number;
b69ab3139 options: WatchmanSubscriptionOptions;
b69ab3140 emitter: EventEmitter;
b69ab3141};
b69ab3142
b69ab3143export type WatchmanSubscriptionResponse = {
b69ab3144 root: string;
b69ab3145 subscription: string;
b69ab3146 files?: Array<FileChange>;
b69ab3147 'state-enter'?: string;
b69ab3148 'state-leave'?: string;
b69ab3149 canceled?: boolean;
b69ab3150 clock?: string;
b69ab3151 is_fresh_instance?: boolean;
b69ab3152};
b69ab3153
b69ab3154export type FileChange = {
b69ab3155 name: string;
b69ab3156 new: boolean;
b69ab3157 exists: boolean;
b69ab3158};
b69ab3159
b69ab3160const WATCHMAN_SETTLE_TIME_MS = 2500;
b69ab3161const DEFAULT_WATCHMAN_RECONNECT_DELAY_MS = 100;
b69ab3162const MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS = 60 * 1000;
b69ab3163
b69ab3164export class Watchman {
b69ab3165 private client: watchman.Client;
b69ab3166
b69ab3167 private serializedReconnect: () => Promise<void>;
b69ab3168 private reconnectDelayMs: number = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS;
b69ab3169 private subscriptions: Map<string, WatchmanSubscription> = new Map();
b69ab3170 private lastKnownClockTimes: Map<string, string> = new Map();
b69ab3171
b69ab3172 public readonly status: 'initializing' | 'reconnecting' | 'healthy' | 'ended' | 'errored' =
b69ab3173 'initializing';
b69ab3174
4fe1f3475 public onStatusChange: ((status: typeof this.status) => void) | undefined;
4fe1f3476
b69ab3177 constructor(
b69ab3178 private logger: Logger,
b69ab3179 private tracker: ServerSideTracker,
b69ab3180 ) {
b69ab3181 this.client = new watchman.Client({
b69ab3182 // find watchman using PATH
b69ab3183 watchmanBinaryPath: undefined,
b69ab3184 });
b69ab3185 this.initWatchmanClient();
b69ab3186 this.serializedReconnect = serializeAsyncCall(async () => {
b69ab3187 let tries = 0;
b69ab3188 while (true) {
b69ab3189 try {
b69ab3190 // eslint-disable-next-line no-await-in-loop
b69ab3191 await this.reconnectClient();
b69ab3192 return;
b69ab3193 } catch (error) {
b69ab3194 this.logger.warn(
b69ab3195 `reconnectClient failed (try #${tries}):`,
b69ab3196 error instanceof Error ? error.message : error,
b69ab3197 );
b69ab3198 tries++;
b69ab3199
b69ab31100 this.reconnectDelayMs *= 2; // exponential backoff
b69ab31101 if (this.reconnectDelayMs > MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS) {
b69ab31102 this.reconnectDelayMs = MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS;
b69ab31103 }
b69ab31104
b69ab31105 this.logger.info(
b69ab31106 'Calling reconnectClient from _serializedReconnect in %dms',
b69ab31107 this.reconnectDelayMs,
b69ab31108 );
b69ab31109 // eslint-disable-next-line no-await-in-loop
b69ab31110 await sleep(this.reconnectDelayMs);
b69ab31111 }
b69ab31112 }
b69ab31113 });
b69ab31114 }
b69ab31115
b69ab31116 setStatus(status: typeof this.status): void {
b69ab31117 this.logger.log('Watchman status: ', status);
b69ab31118 (this.status as string) = status;
4fe1f34119 this.onStatusChange?.(status);
b69ab31120 }
b69ab31121
b69ab31122 public async watchDirectoryRecursive(
b69ab31123 localDirectoryPath: string,
b69ab31124 rawSubscriptionName: string,
b69ab31125 subscriptionOptions?: WatchmanSubscriptionOptions,
b69ab31126 ): Promise<WatchmanSubscription> {
b69ab31127 // Subscriptions should be unique by name and by folder
b69ab31128 const subscriptionName = this.fixupName(localDirectoryPath, rawSubscriptionName);
b69ab31129 const existingSubscription = this.getSubscription(subscriptionName);
b69ab31130 if (existingSubscription) {
b69ab31131 existingSubscription.subscriptionCount++;
b69ab31132
b69ab31133 return existingSubscription;
b69ab31134 } else {
b69ab31135 const {watch: watchRoot, relative_path: relativePath} =
b69ab31136 await this.watchProject(localDirectoryPath);
b69ab31137 const clock = await this.clock(watchRoot);
b69ab31138 const options: WatchmanSubscriptionOptions = {
b69ab31139 ...subscriptionOptions,
b69ab31140 // Do not add `mode` here, it is very unfriendly to watches on Eden (see https://fburl.com/0z023yy0)
b69ab31141 fields:
b69ab31142 subscriptionOptions != null && subscriptionOptions.fields != null
b69ab31143 ? subscriptionOptions.fields
b69ab31144 : ['name', 'new', 'exists'],
b69ab31145 since: clock,
b69ab31146 };
b69ab31147 if (relativePath) {
b69ab31148 options.relative_root = relativePath;
b69ab31149 }
b69ab31150 // Try this thing out where we always set empty_on_fresh_instance. Eden will be a lot happier
b69ab31151 // if we never ask Watchman to do something that results in a glob(**) near the root.
b69ab31152 options.empty_on_fresh_instance = true;
b69ab31153
b69ab31154 // relativePath is undefined if watchRoot is the same as directoryPath.
b69ab31155 const subscription: WatchmanSubscription = {
b69ab31156 root: watchRoot,
b69ab31157 pathFromSubscriptionRootToSubscriptionPath: relativePath,
b69ab31158 path: localDirectoryPath,
b69ab31159 name: subscriptionName,
b69ab31160 subscriptionCount: 1,
b69ab31161 options,
b69ab31162 emitter: new EventEmitter(),
b69ab31163 };
b69ab31164 this.setSubscription(subscriptionName, subscription);
b69ab31165 await this.subscribe(watchRoot, subscriptionName, options);
b69ab31166 this.logger.log('watchman subscription %s established', subscriptionName);
b69ab31167 this.setStatus('healthy');
b69ab31168
b69ab31169 return subscription;
b69ab31170 }
b69ab31171 }
b69ab31172
b69ab31173 public async unwatch(path: string, name: string): Promise<void> {
b69ab31174 const subscriptionName = this.fixupName(path, name);
b69ab31175 const subscription = this.getSubscription(subscriptionName);
b69ab31176
b69ab31177 if (subscription == null) {
b69ab31178 this.logger.error(`No watcher entity found with path [${path}] name [${name}]`);
b69ab31179 return;
b69ab31180 }
b69ab31181
b69ab31182 if (--subscription.subscriptionCount === 0) {
b69ab31183 await this.unsubscribe(subscription.path, subscription.name).catch(err => {
b69ab31184 // Directory maybe doesn't exist anymore. Don't propagate error - we want to clean
b69ab31185 // up subscription anyway so we don't get stuck reconnecting.
b69ab31186 this.logger.error(`error unsubscribing watchman subscription ${subscriptionName}: ${err}`);
b69ab31187 });
b69ab31188 this.deleteSubscription(subscriptionName);
b69ab31189 this.logger.log(`watchman subscription destroyed: ${subscriptionName}`);
b69ab31190 }
b69ab31191 }
b69ab31192
b69ab31193 private initWatchmanClient(): void {
b69ab31194 this.client.on('end', () => {
b69ab31195 this.setStatus('ended');
b69ab31196 this.logger.info('Watchman client ended');
b69ab31197 this.client.removeAllListeners();
b69ab31198 this.serializedReconnect();
b69ab31199 });
b69ab31200 this.client.on('error', (error: Error) => {
b69ab31201 const statusBeforeError = this.status;
b69ab31202 this.logger.error('Error while talking to watchman: ', error);
b69ab31203 this.tracker.error(
b69ab31204 'WatchmanEvent',
b69ab31205 'WatchmanError',
b69ab31206 `Error while talking to watchman ${error}`,
b69ab31207 );
b69ab31208 this.setStatus('errored');
b69ab31209 // If Watchman encounters an error in the middle of a command, it may never finish!
b69ab31210 // The client must be immediately killed here so that the command fails and
b69ab31211 // `serializeAsyncCall` can be unblocked. Otherwise, we end up in a deadlock.
b69ab31212 this.client.removeAllListeners();
b69ab31213 this.client.end();
b69ab31214 if (statusBeforeError === 'initializing') {
b69ab31215 // If we get an error while we're first initializing watchman, it probably means
b69ab31216 // it's not installed properly. No use spamming reconnection failures too.
b69ab31217 return;
b69ab31218 }
b69ab31219 // Those are errors in deserializing a stream of changes.
b69ab31220 // The only possible recovery here is reconnecting a new client,
b69ab31221 // but the failed to serialize events will be missed.
b69ab31222 this.serializedReconnect();
b69ab31223 });
b69ab31224 this.client.on('subscription', this.onSubscriptionResult.bind(this));
b69ab31225 }
b69ab31226
b69ab31227 private async reconnectClient(): Promise<void> {
b69ab31228 // If we got an error after making a subscription, the reconnect needs to
b69ab31229 // remove that subscription to try again, so it doesn't keep leaking subscriptions.
b69ab31230 this.logger.info('Ending existing watchman client to reconnect a new one');
b69ab31231 this.setStatus('reconnecting');
b69ab31232 this.client.removeAllListeners();
b69ab31233 this.client.end();
b69ab31234 this.client = new watchman.Client({
b69ab31235 // find watchman using PATH
b69ab31236 watchmanBinaryPath: undefined,
b69ab31237 });
b69ab31238 this.logger.error('Watchman client disconnected, reconnecting a new client!');
b69ab31239 this.initWatchmanClient();
b69ab31240 this.logger.info('Watchman client re-initialized, restoring subscriptions');
b69ab31241 await this.restoreSubscriptions();
b69ab31242 }
b69ab31243
b69ab31244 private async restoreSubscriptions(): Promise<void> {
b69ab31245 const watchSubscriptions = Array.from(this.subscriptions.values());
b69ab31246 const numSubscriptions = watchSubscriptions.length;
b69ab31247 this.logger.info(`Attempting to restore ${numSubscriptions} Watchman subscriptions.`);
b69ab31248 let numRestored = 0;
b69ab31249 await Promise.all(
b69ab31250 watchSubscriptions.map(async (subscription: WatchmanSubscription, index: number) => {
b69ab31251 // Note that this call to `watchman watch-project` could fail if the
b69ab31252 // subscription.path has been unmounted/deleted.
b69ab31253 await this.watchProject(subscription.path);
b69ab31254
b69ab31255 // We have already missed the change events from the disconnect time,
b69ab31256 // watchman could have died, so the last clock result is not valid.
b69ab31257 await sleep(WATCHMAN_SETTLE_TIME_MS);
b69ab31258
b69ab31259 // Register the subscriptions after the filesystem settles.
b69ab31260 const {name, options, root} = subscription;
b69ab31261
b69ab31262 // Assuming we had previously connected and gotten an event, we can
b69ab31263 // reconnect `since` that time, so that we get any events we missed.
b69ab31264 subscription.options.since = this.lastKnownClockTimes.get(root) || (await this.clock(root));
b69ab31265
b69ab31266 this.logger.info(`Subscribing to ${name}: (${index + 1}/${numSubscriptions})`);
b69ab31267 await this.subscribe(root, name, options);
b69ab31268 ++numRestored;
b69ab31269 this.logger.info(`Subscribed to ${name}: (${numRestored}/${numSubscriptions}) complete.`);
b69ab31270 }),
b69ab31271 );
b69ab31272 if (numRestored > 0 && numRestored === numSubscriptions) {
b69ab31273 this.logger.info('Successfully reconnected all %d subscriptions.', numRestored);
b69ab31274 // if everything got restored, reset the reconnect backoff time
b69ab31275 this.reconnectDelayMs = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS;
b69ab31276 this.setStatus('healthy');
b69ab31277 }
b69ab31278 }
b69ab31279
b69ab31280 private getSubscription(entryPath: string): WatchmanSubscription | undefined {
b69ab31281 return this.subscriptions.get(path.normalize(entryPath));
b69ab31282 }
b69ab31283
b69ab31284 private setSubscription(entryPath: string, subscription: WatchmanSubscription): void {
b69ab31285 const key = path.normalize(entryPath);
b69ab31286 this.subscriptions.set(key, subscription);
b69ab31287 }
b69ab31288
b69ab31289 private deleteSubscription(entryPath: string): void {
b69ab31290 const key = path.normalize(entryPath);
b69ab31291 this.subscriptions.delete(key);
b69ab31292 }
b69ab31293
b69ab31294 private onSubscriptionResult(response: WatchmanSubscriptionResponse): void {
b69ab31295 const subscription = this.getSubscription(response.subscription);
b69ab31296 if (subscription == null) {
b69ab31297 this.logger.error('Subscription not found for response:!', response);
b69ab31298 return;
b69ab31299 }
b69ab31300
b69ab31301 // save the clock time of this event in case we disconnect in the future
b69ab31302 if (response != null && response.root != null && response.clock != null) {
b69ab31303 this.lastKnownClockTimes.set(response.root, response.clock);
b69ab31304 }
b69ab31305 if (response.is_fresh_instance === true) {
b69ab31306 this.logger.warn(
b69ab31307 `Watch for ${response.root} (${response.subscription}) returned an empty fresh instance.`,
b69ab31308 );
b69ab31309 subscription.emitter.emit('fresh-instance');
b69ab31310 } else if (Array.isArray(response.files)) {
b69ab31311 subscription.emitter.emit('change', response.files);
b69ab31312 } else if (response.canceled === true) {
b69ab31313 this.logger.info(`Watch for ${response.root} was deleted: triggering a reconnect.`);
b69ab31314 // Ending the client will trigger a reconnect.
b69ab31315 this.client.end();
b69ab31316 } else {
b69ab31317 // Only log state transitions once per watchmanClient, since otherwise we'll just get 8x of this message spammed
b69ab31318 if (firstOfIterable(this.subscriptions.values()) === subscription) {
b69ab31319 // TODO(most): use state messages to decide on when to send updates.
b69ab31320 const stateEnter = response['state-enter'];
b69ab31321 const stateLeave = response['state-leave'];
b69ab31322 const stateMessage =
b69ab31323 stateEnter != null ? `Entering ${stateEnter}` : `Leaving ${stateLeave}`;
b69ab31324 const numSubscriptions = this.subscriptions.size;
b69ab31325 this.logger.info(`Subscription state: ${stateMessage} (${numSubscriptions})`);
b69ab31326 }
b69ab31327 }
b69ab31328 }
b69ab31329
b69ab31330 private fixupName(path: string, name: string): string {
b69ab31331 const refinedPath = path.replace(/\\/g, '-').replace(/\//g, '-');
b69ab31332 return `${refinedPath}-${name}`;
b69ab31333 }
b69ab31334
b69ab31335 private unsubscribe(subscriptionPath: string, subscriptionName: string): Promise<unknown> {
b69ab31336 return this.command('unsubscribe', subscriptionPath, subscriptionName);
b69ab31337 }
b69ab31338
b69ab31339 private async watchProject(
b69ab31340 directoryPath: string,
b69ab31341 ): Promise<{watch: string; relative_path: string}> {
b69ab31342 const response = (await this.command('watch-project', directoryPath)) as {
b69ab31343 watch: string;
b69ab31344 relative_path: string;
b69ab31345 warning?: string;
b69ab31346 };
b69ab31347 if (response.warning) {
b69ab31348 this.logger.error('watchman warning: ', response.warning);
b69ab31349 }
b69ab31350 return response;
b69ab31351 }
b69ab31352
b69ab31353 private async clock(directoryPath: string): Promise<string> {
b69ab31354 const {clock} = (await this.command('clock', directoryPath)) as {clock: string};
b69ab31355 return clock;
b69ab31356 }
b69ab31357
b69ab31358 private subscribe(
b69ab31359 watchRoot: string,
b69ab31360 subscriptionName: string | undefined,
b69ab31361 options: WatchmanSubscriptionOptions,
b69ab31362 ): Promise<WatchmanSubscription> {
b69ab31363 this.logger.info(
b69ab31364 `Creating Watchman subscription ${String(subscriptionName)} under ${watchRoot}`,
b69ab31365 JSON.stringify(options),
b69ab31366 );
b69ab31367 return this.command(
b69ab31368 'subscribe',
b69ab31369 watchRoot,
b69ab31370 subscriptionName,
b69ab31371 options,
b69ab31372 ) as Promise<WatchmanSubscription>;
b69ab31373 }
b69ab31374
b69ab31375 /**
b69ab31376 * Promisify calls to watchman client.
b69ab31377 */
b69ab31378 private async command(...args: Array<unknown>): Promise<unknown> {
b69ab31379 try {
b69ab31380 return await new Promise((resolve, reject) => {
b69ab31381 this.client.command(args, (error, response) => (error ? reject(error) : resolve(response)));
b69ab31382 });
b69ab31383 } catch (error) {
b69ab31384 this.logger.error('Watchman command error: ', args, error);
b69ab31385 throw error;
b69ab31386 }
b69ab31387 }
b69ab31388}