14.2 KB389 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import type {Logger} from './logger';
9
10import watchman from 'fb-watchman';
11import {EventEmitter} from 'node:events';
12import path from 'node:path';
13import {type ServerSideTracker} from './analytics/serverSideTracker';
14import {firstOfIterable, serializeAsyncCall, sleep} from './utils';
15
16export type WatchmanSubscriptionOptions = {
17 fields?: Array<string>;
18 expression?: Array<unknown>;
19 since?: string;
20 defer?: Array<string>;
21 defer_vcs?: boolean;
22 relative_root?: string;
23 empty_on_fresh_instance?: boolean;
24};
25
26export type WatchmanSubscription = {
27 root: string;
28 /**
29 * The relative path from subscriptionRoot to subscriptionPath.
30 * This is the 'relative_path' as described at
31 * https://facebook.github.io/watchman/docs/cmd/watch-project.html#using-watch-project.
32 * Notably, this value should be undefined if subscriptionRoot is the same as
33 * subscriptionPath.
34 */
35 pathFromSubscriptionRootToSubscriptionPath: string | undefined;
36 path: string;
37 name: string;
38 subscriptionCount: number;
39 options: WatchmanSubscriptionOptions;
40 emitter: EventEmitter;
41};
42
43export type WatchmanSubscriptionResponse = {
44 root: string;
45 subscription: string;
46 files?: Array<FileChange>;
47 'state-enter'?: string;
48 'state-leave'?: string;
49 canceled?: boolean;
50 clock?: string;
51 is_fresh_instance?: boolean;
52};
53
54export type FileChange = {
55 name: string;
56 new: boolean;
57 exists: boolean;
58};
59
60const WATCHMAN_SETTLE_TIME_MS = 2500;
61const DEFAULT_WATCHMAN_RECONNECT_DELAY_MS = 100;
62const MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS = 60 * 1000;
63
64export class Watchman {
65 private client: watchman.Client;
66
67 private serializedReconnect: () => Promise<void>;
68 private reconnectDelayMs: number = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS;
69 private subscriptions: Map<string, WatchmanSubscription> = new Map();
70 private lastKnownClockTimes: Map<string, string> = new Map();
71
72 public readonly status: 'initializing' | 'reconnecting' | 'healthy' | 'ended' | 'errored' =
73 'initializing';
74
75 public onStatusChange: ((status: typeof this.status) => void) | undefined;
76
77 constructor(
78 private logger: Logger,
79 private tracker: ServerSideTracker,
80 ) {
81 this.client = new watchman.Client({
82 // find watchman using PATH
83 watchmanBinaryPath: undefined,
84 });
85 this.initWatchmanClient();
86 this.serializedReconnect = serializeAsyncCall(async () => {
87 let tries = 0;
88 while (true) {
89 try {
90 // eslint-disable-next-line no-await-in-loop
91 await this.reconnectClient();
92 return;
93 } catch (error) {
94 this.logger.warn(
95 `reconnectClient failed (try #${tries}):`,
96 error instanceof Error ? error.message : error,
97 );
98 tries++;
99
100 this.reconnectDelayMs *= 2; // exponential backoff
101 if (this.reconnectDelayMs > MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS) {
102 this.reconnectDelayMs = MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS;
103 }
104
105 this.logger.info(
106 'Calling reconnectClient from _serializedReconnect in %dms',
107 this.reconnectDelayMs,
108 );
109 // eslint-disable-next-line no-await-in-loop
110 await sleep(this.reconnectDelayMs);
111 }
112 }
113 });
114 }
115
116 setStatus(status: typeof this.status): void {
117 this.logger.log('Watchman status: ', status);
118 (this.status as string) = status;
119 this.onStatusChange?.(status);
120 }
121
122 public async watchDirectoryRecursive(
123 localDirectoryPath: string,
124 rawSubscriptionName: string,
125 subscriptionOptions?: WatchmanSubscriptionOptions,
126 ): Promise<WatchmanSubscription> {
127 // Subscriptions should be unique by name and by folder
128 const subscriptionName = this.fixupName(localDirectoryPath, rawSubscriptionName);
129 const existingSubscription = this.getSubscription(subscriptionName);
130 if (existingSubscription) {
131 existingSubscription.subscriptionCount++;
132
133 return existingSubscription;
134 } else {
135 const {watch: watchRoot, relative_path: relativePath} =
136 await this.watchProject(localDirectoryPath);
137 const clock = await this.clock(watchRoot);
138 const options: WatchmanSubscriptionOptions = {
139 ...subscriptionOptions,
140 // Do not add `mode` here, it is very unfriendly to watches on Eden (see https://fburl.com/0z023yy0)
141 fields:
142 subscriptionOptions != null && subscriptionOptions.fields != null
143 ? subscriptionOptions.fields
144 : ['name', 'new', 'exists'],
145 since: clock,
146 };
147 if (relativePath) {
148 options.relative_root = relativePath;
149 }
150 // Try this thing out where we always set empty_on_fresh_instance. Eden will be a lot happier
151 // if we never ask Watchman to do something that results in a glob(**) near the root.
152 options.empty_on_fresh_instance = true;
153
154 // relativePath is undefined if watchRoot is the same as directoryPath.
155 const subscription: WatchmanSubscription = {
156 root: watchRoot,
157 pathFromSubscriptionRootToSubscriptionPath: relativePath,
158 path: localDirectoryPath,
159 name: subscriptionName,
160 subscriptionCount: 1,
161 options,
162 emitter: new EventEmitter(),
163 };
164 this.setSubscription(subscriptionName, subscription);
165 await this.subscribe(watchRoot, subscriptionName, options);
166 this.logger.log('watchman subscription %s established', subscriptionName);
167 this.setStatus('healthy');
168
169 return subscription;
170 }
171 }
172
173 public async unwatch(path: string, name: string): Promise<void> {
174 const subscriptionName = this.fixupName(path, name);
175 const subscription = this.getSubscription(subscriptionName);
176
177 if (subscription == null) {
178 this.logger.error(`No watcher entity found with path [${path}] name [${name}]`);
179 return;
180 }
181
182 if (--subscription.subscriptionCount === 0) {
183 await this.unsubscribe(subscription.path, subscription.name).catch(err => {
184 // Directory maybe doesn't exist anymore. Don't propagate error - we want to clean
185 // up subscription anyway so we don't get stuck reconnecting.
186 this.logger.error(`error unsubscribing watchman subscription ${subscriptionName}: ${err}`);
187 });
188 this.deleteSubscription(subscriptionName);
189 this.logger.log(`watchman subscription destroyed: ${subscriptionName}`);
190 }
191 }
192
193 private initWatchmanClient(): void {
194 this.client.on('end', () => {
195 this.setStatus('ended');
196 this.logger.info('Watchman client ended');
197 this.client.removeAllListeners();
198 this.serializedReconnect();
199 });
200 this.client.on('error', (error: Error) => {
201 const statusBeforeError = this.status;
202 this.logger.error('Error while talking to watchman: ', error);
203 this.tracker.error(
204 'WatchmanEvent',
205 'WatchmanError',
206 `Error while talking to watchman ${error}`,
207 );
208 this.setStatus('errored');
209 // If Watchman encounters an error in the middle of a command, it may never finish!
210 // The client must be immediately killed here so that the command fails and
211 // `serializeAsyncCall` can be unblocked. Otherwise, we end up in a deadlock.
212 this.client.removeAllListeners();
213 this.client.end();
214 if (statusBeforeError === 'initializing') {
215 // If we get an error while we're first initializing watchman, it probably means
216 // it's not installed properly. No use spamming reconnection failures too.
217 return;
218 }
219 // Those are errors in deserializing a stream of changes.
220 // The only possible recovery here is reconnecting a new client,
221 // but the failed to serialize events will be missed.
222 this.serializedReconnect();
223 });
224 this.client.on('subscription', this.onSubscriptionResult.bind(this));
225 }
226
227 private async reconnectClient(): Promise<void> {
228 // If we got an error after making a subscription, the reconnect needs to
229 // remove that subscription to try again, so it doesn't keep leaking subscriptions.
230 this.logger.info('Ending existing watchman client to reconnect a new one');
231 this.setStatus('reconnecting');
232 this.client.removeAllListeners();
233 this.client.end();
234 this.client = new watchman.Client({
235 // find watchman using PATH
236 watchmanBinaryPath: undefined,
237 });
238 this.logger.error('Watchman client disconnected, reconnecting a new client!');
239 this.initWatchmanClient();
240 this.logger.info('Watchman client re-initialized, restoring subscriptions');
241 await this.restoreSubscriptions();
242 }
243
244 private async restoreSubscriptions(): Promise<void> {
245 const watchSubscriptions = Array.from(this.subscriptions.values());
246 const numSubscriptions = watchSubscriptions.length;
247 this.logger.info(`Attempting to restore ${numSubscriptions} Watchman subscriptions.`);
248 let numRestored = 0;
249 await Promise.all(
250 watchSubscriptions.map(async (subscription: WatchmanSubscription, index: number) => {
251 // Note that this call to `watchman watch-project` could fail if the
252 // subscription.path has been unmounted/deleted.
253 await this.watchProject(subscription.path);
254
255 // We have already missed the change events from the disconnect time,
256 // watchman could have died, so the last clock result is not valid.
257 await sleep(WATCHMAN_SETTLE_TIME_MS);
258
259 // Register the subscriptions after the filesystem settles.
260 const {name, options, root} = subscription;
261
262 // Assuming we had previously connected and gotten an event, we can
263 // reconnect `since` that time, so that we get any events we missed.
264 subscription.options.since = this.lastKnownClockTimes.get(root) || (await this.clock(root));
265
266 this.logger.info(`Subscribing to ${name}: (${index + 1}/${numSubscriptions})`);
267 await this.subscribe(root, name, options);
268 ++numRestored;
269 this.logger.info(`Subscribed to ${name}: (${numRestored}/${numSubscriptions}) complete.`);
270 }),
271 );
272 if (numRestored > 0 && numRestored === numSubscriptions) {
273 this.logger.info('Successfully reconnected all %d subscriptions.', numRestored);
274 // if everything got restored, reset the reconnect backoff time
275 this.reconnectDelayMs = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS;
276 this.setStatus('healthy');
277 }
278 }
279
280 private getSubscription(entryPath: string): WatchmanSubscription | undefined {
281 return this.subscriptions.get(path.normalize(entryPath));
282 }
283
284 private setSubscription(entryPath: string, subscription: WatchmanSubscription): void {
285 const key = path.normalize(entryPath);
286 this.subscriptions.set(key, subscription);
287 }
288
289 private deleteSubscription(entryPath: string): void {
290 const key = path.normalize(entryPath);
291 this.subscriptions.delete(key);
292 }
293
294 private onSubscriptionResult(response: WatchmanSubscriptionResponse): void {
295 const subscription = this.getSubscription(response.subscription);
296 if (subscription == null) {
297 this.logger.error('Subscription not found for response:!', response);
298 return;
299 }
300
301 // save the clock time of this event in case we disconnect in the future
302 if (response != null && response.root != null && response.clock != null) {
303 this.lastKnownClockTimes.set(response.root, response.clock);
304 }
305 if (response.is_fresh_instance === true) {
306 this.logger.warn(
307 `Watch for ${response.root} (${response.subscription}) returned an empty fresh instance.`,
308 );
309 subscription.emitter.emit('fresh-instance');
310 } else if (Array.isArray(response.files)) {
311 subscription.emitter.emit('change', response.files);
312 } else if (response.canceled === true) {
313 this.logger.info(`Watch for ${response.root} was deleted: triggering a reconnect.`);
314 // Ending the client will trigger a reconnect.
315 this.client.end();
316 } else {
317 // Only log state transitions once per watchmanClient, since otherwise we'll just get 8x of this message spammed
318 if (firstOfIterable(this.subscriptions.values()) === subscription) {
319 // TODO(most): use state messages to decide on when to send updates.
320 const stateEnter = response['state-enter'];
321 const stateLeave = response['state-leave'];
322 const stateMessage =
323 stateEnter != null ? `Entering ${stateEnter}` : `Leaving ${stateLeave}`;
324 const numSubscriptions = this.subscriptions.size;
325 this.logger.info(`Subscription state: ${stateMessage} (${numSubscriptions})`);
326 }
327 }
328 }
329
330 private fixupName(path: string, name: string): string {
331 const refinedPath = path.replace(/\\/g, '-').replace(/\//g, '-');
332 return `${refinedPath}-${name}`;
333 }
334
335 private unsubscribe(subscriptionPath: string, subscriptionName: string): Promise<unknown> {
336 return this.command('unsubscribe', subscriptionPath, subscriptionName);
337 }
338
339 private async watchProject(
340 directoryPath: string,
341 ): Promise<{watch: string; relative_path: string}> {
342 const response = (await this.command('watch-project', directoryPath)) as {
343 watch: string;
344 relative_path: string;
345 warning?: string;
346 };
347 if (response.warning) {
348 this.logger.error('watchman warning: ', response.warning);
349 }
350 return response;
351 }
352
353 private async clock(directoryPath: string): Promise<string> {
354 const {clock} = (await this.command('clock', directoryPath)) as {clock: string};
355 return clock;
356 }
357
358 private subscribe(
359 watchRoot: string,
360 subscriptionName: string | undefined,
361 options: WatchmanSubscriptionOptions,
362 ): Promise<WatchmanSubscription> {
363 this.logger.info(
364 `Creating Watchman subscription ${String(subscriptionName)} under ${watchRoot}`,
365 JSON.stringify(options),
366 );
367 return this.command(
368 'subscribe',
369 watchRoot,
370 subscriptionName,
371 options,
372 ) as Promise<WatchmanSubscription>;
373 }
374
375 /**
376 * Promisify calls to watchman client.
377 */
378 private async command(...args: Array<unknown>): Promise<unknown> {
379 try {
380 return await new Promise((resolve, reject) => {
381 this.client.command(args, (error, response) => (error ? reject(error) : resolve(response)));
382 });
383 } catch (error) {
384 this.logger.error('Watchman command error: ', args, error);
385 throw error;
386 }
387 }
388}
389