| 1 | /** |
| 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | * |
| 4 | * This source code is licensed under the MIT license found in the |
| 5 | * LICENSE file in the root directory of this source tree. |
| 6 | */ |
| 7 | |
| 8 | import type {Logger} from './logger'; |
| 9 | |
| 10 | import watchman from 'fb-watchman'; |
| 11 | import {EventEmitter} from 'node:events'; |
| 12 | import path from 'node:path'; |
| 13 | import {type ServerSideTracker} from './analytics/serverSideTracker'; |
| 14 | import {firstOfIterable, serializeAsyncCall, sleep} from './utils'; |
| 15 | |
| 16 | export type WatchmanSubscriptionOptions = { |
| 17 | fields?: Array<string>; |
| 18 | expression?: Array<unknown>; |
| 19 | since?: string; |
| 20 | defer?: Array<string>; |
| 21 | defer_vcs?: boolean; |
| 22 | relative_root?: string; |
| 23 | empty_on_fresh_instance?: boolean; |
| 24 | }; |
| 25 | |
| 26 | export type WatchmanSubscription = { |
| 27 | root: string; |
| 28 | /** |
| 29 | * The relative path from subscriptionRoot to subscriptionPath. |
| 30 | * This is the 'relative_path' as described at |
| 31 | * https://facebook.github.io/watchman/docs/cmd/watch-project.html#using-watch-project. |
| 32 | * Notably, this value should be undefined if subscriptionRoot is the same as |
| 33 | * subscriptionPath. |
| 34 | */ |
| 35 | pathFromSubscriptionRootToSubscriptionPath: string | undefined; |
| 36 | path: string; |
| 37 | name: string; |
| 38 | subscriptionCount: number; |
| 39 | options: WatchmanSubscriptionOptions; |
| 40 | emitter: EventEmitter; |
| 41 | }; |
| 42 | |
| 43 | export type WatchmanSubscriptionResponse = { |
| 44 | root: string; |
| 45 | subscription: string; |
| 46 | files?: Array<FileChange>; |
| 47 | 'state-enter'?: string; |
| 48 | 'state-leave'?: string; |
| 49 | canceled?: boolean; |
| 50 | clock?: string; |
| 51 | is_fresh_instance?: boolean; |
| 52 | }; |
| 53 | |
| 54 | export type FileChange = { |
| 55 | name: string; |
| 56 | new: boolean; |
| 57 | exists: boolean; |
| 58 | }; |
| 59 | |
| 60 | const WATCHMAN_SETTLE_TIME_MS = 2500; |
| 61 | const DEFAULT_WATCHMAN_RECONNECT_DELAY_MS = 100; |
| 62 | const MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS = 60 * 1000; |
| 63 | |
| 64 | export class Watchman { |
| 65 | private client: watchman.Client; |
| 66 | |
| 67 | private serializedReconnect: () => Promise<void>; |
| 68 | private reconnectDelayMs: number = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS; |
| 69 | private subscriptions: Map<string, WatchmanSubscription> = new Map(); |
| 70 | private lastKnownClockTimes: Map<string, string> = new Map(); |
| 71 | |
| 72 | public readonly status: 'initializing' | 'reconnecting' | 'healthy' | 'ended' | 'errored' = |
| 73 | 'initializing'; |
| 74 | |
| 75 | public onStatusChange: ((status: typeof this.status) => void) | undefined; |
| 76 | |
| 77 | constructor( |
| 78 | private logger: Logger, |
| 79 | private tracker: ServerSideTracker, |
| 80 | ) { |
| 81 | this.client = new watchman.Client({ |
| 82 | // find watchman using PATH |
| 83 | watchmanBinaryPath: undefined, |
| 84 | }); |
| 85 | this.initWatchmanClient(); |
| 86 | this.serializedReconnect = serializeAsyncCall(async () => { |
| 87 | let tries = 0; |
| 88 | while (true) { |
| 89 | try { |
| 90 | // eslint-disable-next-line no-await-in-loop |
| 91 | await this.reconnectClient(); |
| 92 | return; |
| 93 | } catch (error) { |
| 94 | this.logger.warn( |
| 95 | `reconnectClient failed (try #${tries}):`, |
| 96 | error instanceof Error ? error.message : error, |
| 97 | ); |
| 98 | tries++; |
| 99 | |
| 100 | this.reconnectDelayMs *= 2; // exponential backoff |
| 101 | if (this.reconnectDelayMs > MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS) { |
| 102 | this.reconnectDelayMs = MAXIMUM_WATCHMAN_RECONNECT_DELAY_MS; |
| 103 | } |
| 104 | |
| 105 | this.logger.info( |
| 106 | 'Calling reconnectClient from _serializedReconnect in %dms', |
| 107 | this.reconnectDelayMs, |
| 108 | ); |
| 109 | // eslint-disable-next-line no-await-in-loop |
| 110 | await sleep(this.reconnectDelayMs); |
| 111 | } |
| 112 | } |
| 113 | }); |
| 114 | } |
| 115 | |
| 116 | setStatus(status: typeof this.status): void { |
| 117 | this.logger.log('Watchman status: ', status); |
| 118 | (this.status as string) = status; |
| 119 | this.onStatusChange?.(status); |
| 120 | } |
| 121 | |
| 122 | public async watchDirectoryRecursive( |
| 123 | localDirectoryPath: string, |
| 124 | rawSubscriptionName: string, |
| 125 | subscriptionOptions?: WatchmanSubscriptionOptions, |
| 126 | ): Promise<WatchmanSubscription> { |
| 127 | // Subscriptions should be unique by name and by folder |
| 128 | const subscriptionName = this.fixupName(localDirectoryPath, rawSubscriptionName); |
| 129 | const existingSubscription = this.getSubscription(subscriptionName); |
| 130 | if (existingSubscription) { |
| 131 | existingSubscription.subscriptionCount++; |
| 132 | |
| 133 | return existingSubscription; |
| 134 | } else { |
| 135 | const {watch: watchRoot, relative_path: relativePath} = |
| 136 | await this.watchProject(localDirectoryPath); |
| 137 | const clock = await this.clock(watchRoot); |
| 138 | const options: WatchmanSubscriptionOptions = { |
| 139 | ...subscriptionOptions, |
| 140 | // Do not add `mode` here, it is very unfriendly to watches on Eden (see https://fburl.com/0z023yy0) |
| 141 | fields: |
| 142 | subscriptionOptions != null && subscriptionOptions.fields != null |
| 143 | ? subscriptionOptions.fields |
| 144 | : ['name', 'new', 'exists'], |
| 145 | since: clock, |
| 146 | }; |
| 147 | if (relativePath) { |
| 148 | options.relative_root = relativePath; |
| 149 | } |
| 150 | // Try this thing out where we always set empty_on_fresh_instance. Eden will be a lot happier |
| 151 | // if we never ask Watchman to do something that results in a glob(**) near the root. |
| 152 | options.empty_on_fresh_instance = true; |
| 153 | |
| 154 | // relativePath is undefined if watchRoot is the same as directoryPath. |
| 155 | const subscription: WatchmanSubscription = { |
| 156 | root: watchRoot, |
| 157 | pathFromSubscriptionRootToSubscriptionPath: relativePath, |
| 158 | path: localDirectoryPath, |
| 159 | name: subscriptionName, |
| 160 | subscriptionCount: 1, |
| 161 | options, |
| 162 | emitter: new EventEmitter(), |
| 163 | }; |
| 164 | this.setSubscription(subscriptionName, subscription); |
| 165 | await this.subscribe(watchRoot, subscriptionName, options); |
| 166 | this.logger.log('watchman subscription %s established', subscriptionName); |
| 167 | this.setStatus('healthy'); |
| 168 | |
| 169 | return subscription; |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | public async unwatch(path: string, name: string): Promise<void> { |
| 174 | const subscriptionName = this.fixupName(path, name); |
| 175 | const subscription = this.getSubscription(subscriptionName); |
| 176 | |
| 177 | if (subscription == null) { |
| 178 | this.logger.error(`No watcher entity found with path [${path}] name [${name}]`); |
| 179 | return; |
| 180 | } |
| 181 | |
| 182 | if (--subscription.subscriptionCount === 0) { |
| 183 | await this.unsubscribe(subscription.path, subscription.name).catch(err => { |
| 184 | // Directory maybe doesn't exist anymore. Don't propagate error - we want to clean |
| 185 | // up subscription anyway so we don't get stuck reconnecting. |
| 186 | this.logger.error(`error unsubscribing watchman subscription ${subscriptionName}: ${err}`); |
| 187 | }); |
| 188 | this.deleteSubscription(subscriptionName); |
| 189 | this.logger.log(`watchman subscription destroyed: ${subscriptionName}`); |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | private initWatchmanClient(): void { |
| 194 | this.client.on('end', () => { |
| 195 | this.setStatus('ended'); |
| 196 | this.logger.info('Watchman client ended'); |
| 197 | this.client.removeAllListeners(); |
| 198 | this.serializedReconnect(); |
| 199 | }); |
| 200 | this.client.on('error', (error: Error) => { |
| 201 | const statusBeforeError = this.status; |
| 202 | this.logger.error('Error while talking to watchman: ', error); |
| 203 | this.tracker.error( |
| 204 | 'WatchmanEvent', |
| 205 | 'WatchmanError', |
| 206 | `Error while talking to watchman ${error}`, |
| 207 | ); |
| 208 | this.setStatus('errored'); |
| 209 | // If Watchman encounters an error in the middle of a command, it may never finish! |
| 210 | // The client must be immediately killed here so that the command fails and |
| 211 | // `serializeAsyncCall` can be unblocked. Otherwise, we end up in a deadlock. |
| 212 | this.client.removeAllListeners(); |
| 213 | this.client.end(); |
| 214 | if (statusBeforeError === 'initializing') { |
| 215 | // If we get an error while we're first initializing watchman, it probably means |
| 216 | // it's not installed properly. No use spamming reconnection failures too. |
| 217 | return; |
| 218 | } |
| 219 | // Those are errors in deserializing a stream of changes. |
| 220 | // The only possible recovery here is reconnecting a new client, |
| 221 | // but the failed to serialize events will be missed. |
| 222 | this.serializedReconnect(); |
| 223 | }); |
| 224 | this.client.on('subscription', this.onSubscriptionResult.bind(this)); |
| 225 | } |
| 226 | |
| 227 | private async reconnectClient(): Promise<void> { |
| 228 | // If we got an error after making a subscription, the reconnect needs to |
| 229 | // remove that subscription to try again, so it doesn't keep leaking subscriptions. |
| 230 | this.logger.info('Ending existing watchman client to reconnect a new one'); |
| 231 | this.setStatus('reconnecting'); |
| 232 | this.client.removeAllListeners(); |
| 233 | this.client.end(); |
| 234 | this.client = new watchman.Client({ |
| 235 | // find watchman using PATH |
| 236 | watchmanBinaryPath: undefined, |
| 237 | }); |
| 238 | this.logger.error('Watchman client disconnected, reconnecting a new client!'); |
| 239 | this.initWatchmanClient(); |
| 240 | this.logger.info('Watchman client re-initialized, restoring subscriptions'); |
| 241 | await this.restoreSubscriptions(); |
| 242 | } |
| 243 | |
| 244 | private async restoreSubscriptions(): Promise<void> { |
| 245 | const watchSubscriptions = Array.from(this.subscriptions.values()); |
| 246 | const numSubscriptions = watchSubscriptions.length; |
| 247 | this.logger.info(`Attempting to restore ${numSubscriptions} Watchman subscriptions.`); |
| 248 | let numRestored = 0; |
| 249 | await Promise.all( |
| 250 | watchSubscriptions.map(async (subscription: WatchmanSubscription, index: number) => { |
| 251 | // Note that this call to `watchman watch-project` could fail if the |
| 252 | // subscription.path has been unmounted/deleted. |
| 253 | await this.watchProject(subscription.path); |
| 254 | |
| 255 | // We have already missed the change events from the disconnect time, |
| 256 | // watchman could have died, so the last clock result is not valid. |
| 257 | await sleep(WATCHMAN_SETTLE_TIME_MS); |
| 258 | |
| 259 | // Register the subscriptions after the filesystem settles. |
| 260 | const {name, options, root} = subscription; |
| 261 | |
| 262 | // Assuming we had previously connected and gotten an event, we can |
| 263 | // reconnect `since` that time, so that we get any events we missed. |
| 264 | subscription.options.since = this.lastKnownClockTimes.get(root) || (await this.clock(root)); |
| 265 | |
| 266 | this.logger.info(`Subscribing to ${name}: (${index + 1}/${numSubscriptions})`); |
| 267 | await this.subscribe(root, name, options); |
| 268 | ++numRestored; |
| 269 | this.logger.info(`Subscribed to ${name}: (${numRestored}/${numSubscriptions}) complete.`); |
| 270 | }), |
| 271 | ); |
| 272 | if (numRestored > 0 && numRestored === numSubscriptions) { |
| 273 | this.logger.info('Successfully reconnected all %d subscriptions.', numRestored); |
| 274 | // if everything got restored, reset the reconnect backoff time |
| 275 | this.reconnectDelayMs = DEFAULT_WATCHMAN_RECONNECT_DELAY_MS; |
| 276 | this.setStatus('healthy'); |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | private getSubscription(entryPath: string): WatchmanSubscription | undefined { |
| 281 | return this.subscriptions.get(path.normalize(entryPath)); |
| 282 | } |
| 283 | |
| 284 | private setSubscription(entryPath: string, subscription: WatchmanSubscription): void { |
| 285 | const key = path.normalize(entryPath); |
| 286 | this.subscriptions.set(key, subscription); |
| 287 | } |
| 288 | |
| 289 | private deleteSubscription(entryPath: string): void { |
| 290 | const key = path.normalize(entryPath); |
| 291 | this.subscriptions.delete(key); |
| 292 | } |
| 293 | |
| 294 | private onSubscriptionResult(response: WatchmanSubscriptionResponse): void { |
| 295 | const subscription = this.getSubscription(response.subscription); |
| 296 | if (subscription == null) { |
| 297 | this.logger.error('Subscription not found for response:!', response); |
| 298 | return; |
| 299 | } |
| 300 | |
| 301 | // save the clock time of this event in case we disconnect in the future |
| 302 | if (response != null && response.root != null && response.clock != null) { |
| 303 | this.lastKnownClockTimes.set(response.root, response.clock); |
| 304 | } |
| 305 | if (response.is_fresh_instance === true) { |
| 306 | this.logger.warn( |
| 307 | `Watch for ${response.root} (${response.subscription}) returned an empty fresh instance.`, |
| 308 | ); |
| 309 | subscription.emitter.emit('fresh-instance'); |
| 310 | } else if (Array.isArray(response.files)) { |
| 311 | subscription.emitter.emit('change', response.files); |
| 312 | } else if (response.canceled === true) { |
| 313 | this.logger.info(`Watch for ${response.root} was deleted: triggering a reconnect.`); |
| 314 | // Ending the client will trigger a reconnect. |
| 315 | this.client.end(); |
| 316 | } else { |
| 317 | // Only log state transitions once per watchmanClient, since otherwise we'll just get 8x of this message spammed |
| 318 | if (firstOfIterable(this.subscriptions.values()) === subscription) { |
| 319 | // TODO(most): use state messages to decide on when to send updates. |
| 320 | const stateEnter = response['state-enter']; |
| 321 | const stateLeave = response['state-leave']; |
| 322 | const stateMessage = |
| 323 | stateEnter != null ? `Entering ${stateEnter}` : `Leaving ${stateLeave}`; |
| 324 | const numSubscriptions = this.subscriptions.size; |
| 325 | this.logger.info(`Subscription state: ${stateMessage} (${numSubscriptions})`); |
| 326 | } |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | private fixupName(path: string, name: string): string { |
| 331 | const refinedPath = path.replace(/\\/g, '-').replace(/\//g, '-'); |
| 332 | return `${refinedPath}-${name}`; |
| 333 | } |
| 334 | |
| 335 | private unsubscribe(subscriptionPath: string, subscriptionName: string): Promise<unknown> { |
| 336 | return this.command('unsubscribe', subscriptionPath, subscriptionName); |
| 337 | } |
| 338 | |
| 339 | private async watchProject( |
| 340 | directoryPath: string, |
| 341 | ): Promise<{watch: string; relative_path: string}> { |
| 342 | const response = (await this.command('watch-project', directoryPath)) as { |
| 343 | watch: string; |
| 344 | relative_path: string; |
| 345 | warning?: string; |
| 346 | }; |
| 347 | if (response.warning) { |
| 348 | this.logger.error('watchman warning: ', response.warning); |
| 349 | } |
| 350 | return response; |
| 351 | } |
| 352 | |
| 353 | private async clock(directoryPath: string): Promise<string> { |
| 354 | const {clock} = (await this.command('clock', directoryPath)) as {clock: string}; |
| 355 | return clock; |
| 356 | } |
| 357 | |
| 358 | private subscribe( |
| 359 | watchRoot: string, |
| 360 | subscriptionName: string | undefined, |
| 361 | options: WatchmanSubscriptionOptions, |
| 362 | ): Promise<WatchmanSubscription> { |
| 363 | this.logger.info( |
| 364 | `Creating Watchman subscription ${String(subscriptionName)} under ${watchRoot}`, |
| 365 | JSON.stringify(options), |
| 366 | ); |
| 367 | return this.command( |
| 368 | 'subscribe', |
| 369 | watchRoot, |
| 370 | subscriptionName, |
| 371 | options, |
| 372 | ) as Promise<WatchmanSubscription>; |
| 373 | } |
| 374 | |
| 375 | /** |
| 376 | * Promisify calls to watchman client. |
| 377 | */ |
| 378 | private async command(...args: Array<unknown>): Promise<unknown> { |
| 379 | try { |
| 380 | return await new Promise((resolve, reject) => { |
| 381 | this.client.command(args, (error, response) => (error ? reject(error) : resolve(response))); |
| 382 | }); |
| 383 | } catch (error) { |
| 384 | this.logger.error('Watchman command error: ', args, error); |
| 385 | throw error; |
| 386 | } |
| 387 | } |
| 388 | } |
| 389 | |