21.9 KB579 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import type {EnsureAssignedTogether} from 'shared/EnsureAssignedTogether';
9import type {Operation} from './operations/Operation';
10import type {Disposable, Hash, ProgressStep, ServerToClientMessage} from './types';
11
12import {atom} from 'jotai';
13import {useCallback} from 'react';
14import {defer} from 'shared/utils';
15import serverAPI from './ClientToServerAPI';
16import {atomFamilyWeak, readAtom, writeAtom} from './jotaiUtils';
17import {atomResetOnCwdChange} from './repositoryData';
18import {applicationinfo} from './serverAPIState';
19import {Timer} from './timer';
20import {registerCleanup, registerDisposable, short} from './utils';
21
22export type OperationInfo = {
23 operation: Operation;
24 startTime?: Date;
25 commandOutput?: Array<string>;
26 currentProgress?: ProgressStep;
27 /** progress message shown next to a commit */
28 inlineProgress?: Map<Hash, string>;
29 /** if true, we have sent "abort" request, the process might have exited or is going to exit soon */
30 aborting?: boolean;
31 /** if true, the operation process has exited AND there's no more optimistic commit state to show */
32 hasCompletedOptimisticState?: boolean;
33 /** if true, the operation process has exited AND there's no more optimistic changes to uncommitted changes to show */
34 hasCompletedUncommittedChangesOptimisticState?: boolean;
35 /** if true, the operation process has exited AND there's no more optimistic changes to merge conflicts to show */
36 hasCompletedMergeConflictsOptimisticState?: boolean;
37 warnings?: Array<string>;
38} & EnsureAssignedTogether<{
39 endTime: Date;
40 exitCode: number;
41}>;
42
43/**
44 * The process has exited but exit code is unknown. Usually exit code is one byte.
45 * '-1024' is unlikely to conflict with a valid exit code.
46 */
47export const EXIT_CODE_FORGET = -1024;
48
49/**
50 * Bundle history of previous operations together with the current operation,
51 * so we can easily manipulate operations together in one piece of state.
52 */
53export interface OperationList {
54 /** The currently running operation, or the most recently run if not currently running. */
55 currentOperation: OperationInfo | undefined;
56 /** All previous operations oldest to newest, not including currentOperation */
57 operationHistory: Array<OperationInfo>;
58}
59const defaultOperationList = () => ({currentOperation: undefined, operationHistory: []});
60
61function startNewOperation(newOperation: Operation, list: OperationList): OperationList {
62 if (list.currentOperation?.operation.id === newOperation.id) {
63 // we already have a new optimistic running operation, don't duplicate it
64 return {...list};
65 } else {
66 // we need to start a new operation
67 const operationHistory = [...list.operationHistory];
68 if (list.currentOperation != null) {
69 operationHistory.push(list.currentOperation);
70 }
71 const inlineProgress: Array<[string, string]> | undefined = newOperation
72 .getInitialInlineProgress?.()
73 ?.map(([k, v]) => [short(k), v]); // inline progress is keyed by short hashes, but let's do that conversion on behalf of operations.
74 const currentOperation: OperationInfo = {
75 operation: newOperation,
76 startTime: new Date(),
77 inlineProgress: inlineProgress == null ? undefined : new Map(inlineProgress),
78 };
79 return {...list, operationHistory, currentOperation};
80 }
81}
82
83/**
84 * Ask the server if the current operation is still running.
85 * The server might send back a "forgot" progress and we can mark
86 * the operation as exited. This is useful when the operation exited
87 * during disconnection.
88 */
89export function maybeRemoveForgottenOperation() {
90 const list = readAtom(operationList);
91 const operationId = list.currentOperation?.operation.id;
92 if (operationId != null) {
93 serverAPI.postMessage({
94 type: 'requestMissedOperationProgress',
95 operationId,
96 });
97 }
98}
99
100export const operationList = atomResetOnCwdChange<OperationList>(defaultOperationList());
101registerCleanup(
102 operationList,
103 serverAPI.onSetup(() => maybeRemoveForgottenOperation()),
104 import.meta.hot,
105);
106registerDisposable(
107 operationList,
108 serverAPI.onMessageOfType('operationProgress', progress => {
109 switch (progress.kind) {
110 case 'spawn':
111 writeAtom(operationList, list => {
112 const operation = operationsById.get(progress.id);
113 if (operation == null) {
114 return list;
115 }
116
117 return startNewOperation(operation, list);
118 });
119 break;
120 case 'stdout':
121 case 'stderr':
122 writeAtom(operationList, current => {
123 const currentOperation = current.currentOperation;
124 if (currentOperation == null) {
125 return current;
126 }
127
128 return {
129 ...current,
130 currentOperation: {
131 ...currentOperation,
132 commandOutput: [...(currentOperation?.commandOutput ?? []), progress.message],
133 currentProgress: undefined, // hide progress on new stdout, so it doesn't appear stuck
134 },
135 };
136 });
137 break;
138 case 'inlineProgress':
139 writeAtom(operationList, current => {
140 const currentOperation = current.currentOperation;
141 if (currentOperation == null) {
142 return current;
143 }
144
145 let inlineProgress: undefined | Map<string, string> =
146 current.currentOperation?.inlineProgress ?? new Map();
147 if (progress.hash) {
148 if (progress.message) {
149 inlineProgress.set(progress.hash, progress.message);
150 } else {
151 inlineProgress.delete(progress.hash);
152 }
153 } else {
154 inlineProgress = undefined;
155 }
156
157 const newCommandOutput = [...(currentOperation?.commandOutput ?? [])];
158 if (progress.hash && progress.message) {
159 // also add inline progress message as if it was on stdout,
160 // so you can see it when reading back the final output
161 newCommandOutput.push(`${progress.hash} - ${progress.message}\n`);
162 }
163
164 return {
165 ...current,
166 currentOperation: {
167 ...currentOperation,
168 inlineProgress,
169 },
170 };
171 });
172 break;
173 case 'progress':
174 writeAtom(operationList, current => {
175 const currentOperation = current.currentOperation;
176 if (currentOperation == null) {
177 return current;
178 }
179
180 const newCommandOutput = [...(currentOperation?.commandOutput ?? [])];
181 if (newCommandOutput.at(-1)?.trim() !== progress.progress.message) {
182 // also add progress message as if it was on stdout,
183 // so you can see it when reading back the final output,
184 // but only if it's a different progress message than we've seen.
185 newCommandOutput.push(progress.progress.message + '\n');
186 }
187
188 return {
189 ...current,
190 currentOperation: {
191 ...currentOperation,
192 commandOutput: newCommandOutput,
193 currentProgress: progress.progress,
194 },
195 };
196 });
197 break;
198 case 'warning':
199 writeAtom(operationList, current => {
200 const currentOperation = current.currentOperation;
201 if (currentOperation == null) {
202 return current;
203 }
204 const warnings = [...(currentOperation?.warnings ?? []), progress.warning];
205 return {
206 ...current,
207 currentOperation: {
208 ...currentOperation,
209 warnings,
210 },
211 };
212 });
213 break;
214 case 'exit':
215 case 'forgot':
216 writeAtom(operationList, current => {
217 const currentOperation = current.currentOperation;
218
219 let operationThatExited: OperationInfo | undefined;
220
221 if (
222 currentOperation == null ||
223 currentOperation.exitCode != null ||
224 currentOperation.operation.id !== progress.id
225 ) {
226 // We've seen cases where we somehow got this exit out of order.
227 // Instead of updating the currentOperation, we need to find the matching historical operation.
228 // (which has the matching ID, and as long as it hasn't already been marked as exited)
229
230 operationThatExited = current.operationHistory.find(
231 op => op.operation.id === progress.id && op.exitCode == null,
232 );
233
234 window.globalIslClientTracker.track('ExitMessageOutOfOrder', {
235 extras: {
236 operationThatExited: operationThatExited?.operation.trackEventName,
237 },
238 });
239 }
240
241 if (operationThatExited == null) {
242 operationThatExited = currentOperation;
243 }
244
245 if (operationThatExited == null) {
246 // We can't do anything about this.
247 return current;
248 }
249
250 const {exitCode, timestamp} =
251 progress.kind === 'exit'
252 ? progress
253 : {exitCode: EXIT_CODE_FORGET, timestamp: Date.now()};
254 const complete = operationCompletionCallbacks.get(operationThatExited.operation.id);
255 complete?.(
256 exitCode === 0 ? undefined : new Error(`Process exited with code ${exitCode}`),
257 );
258 operationCompletionCallbacks.delete(operationThatExited.operation.id);
259
260 const updatedOperation = {
261 ...operationThatExited,
262 exitCode,
263 endTime: new Date(timestamp),
264 inlineProgress: undefined, // inline progress never lasts after exiting
265 };
266
267 if (operationThatExited === currentOperation) {
268 return {
269 ...current,
270 currentOperation: updatedOperation,
271 };
272 } else {
273 return {
274 ...current,
275 operationHistory: current.operationHistory.map(op => {
276 if (op === operationThatExited) {
277 return updatedOperation;
278 }
279 return op;
280 }),
281 };
282 }
283 });
284 break;
285 }
286 }),
287 import.meta.hot,
288);
289
290/** If an operation in the queue fails, it will remove all further queued operations.
291 * On such an error, we move the remaining operations into this separate state to be shown in the UI as a warning.
292 * This lets you see and understand what actions you took that were "reverted", so you might recreate those steps. */
293export const queuedOperationsErrorAtom = atomResetOnCwdChange<
294 | {error: Error; operationThatErrored: Operation | undefined; operations: Array<Operation>}
295 | undefined
296>(undefined);
297
298export const inlineProgressByHash = atomFamilyWeak((hash: Hash) =>
299 atom(get => {
300 const info = get(operationList);
301 const inlineProgress = info.currentOperation?.inlineProgress;
302 if (inlineProgress == null) {
303 return undefined;
304 }
305 const shortHash = short(hash); // progress messages come indexed by short hash
306 return inlineProgress.get(shortHash);
307 }),
308);
309
310export const operationBeingPreviewed = atomResetOnCwdChange<Operation | undefined>(undefined);
311
312/** We don't send entire operations to the server, since not all fields are serializable.
313 * Thus, when the server tells us about the queue of operations, we need to know which operation it's talking about.
314 * Store recently run operations by id. Add to this map whenever a new operation is run. Remove when an operation process exits (successfully or unsuccessfully)
315 */
316const operationsById = new Map<string, Operation>();
317/** Store callbacks to run when an operation completes. This is stored outside of the operation since Operations are typically Immutable. */
318const operationCompletionCallbacks = new Map<string, (error?: Error) => void>();
319
320/**
321 * Subscribe to an operation exiting. Useful for handling cases where an operation fails
322 * and it should reset the UI to try again.
323 */
324export function onOperationExited(
325 cb: (
326 message: ServerToClientMessage & {type: 'operationProgress'; kind: 'exit'},
327 operation: Operation,
328 ) => unknown,
329): Disposable {
330 return serverAPI.onMessageOfType('operationProgress', progress => {
331 if (progress.kind === 'exit') {
332 const op = operationsById.get(progress.id);
333 if (op) {
334 cb(progress, op);
335 }
336 }
337 });
338}
339
340/**
341 * If no operations are running or queued, returns undefined.
342 * If something is running or queued, return a Promise that resolves
343 * when there's no operation running and nothing remains queued (the UI is "idle")
344 * Does not wait for optimistic state to be resolved, only for commands to finish.
345 */
346export function waitForNothingRunning(): Promise<void> | undefined {
347 const currentOperation = readAtom(operationList).currentOperation;
348 const somethingRunning = currentOperation != null && currentOperation?.exitCode == null;
349 const anythingQueued = readAtom(queuedOperations).length > 0;
350 if (!somethingRunning && !anythingQueued) {
351 // nothing running, nothing queued -> return undefined immediately
352 return undefined;
353 }
354 return serverAPI
355 .nextMessageMatching(
356 'operationProgress',
357 // something running but nothing queued -> resolve when the operation exits
358 // something queued -> resolve when the next operation exits, but only once the queue is empty
359 // something running but exits non-zero -> everything queue'd will be cancelled anyway, resolve immediately
360 msg => msg.kind === 'exit' && (msg.exitCode !== 0 || readAtom(queuedOperations).length === 0),
361 )
362 .then(() => undefined);
363}
364
365export const queuedOperations = atomResetOnCwdChange<Array<Operation>>([]);
366registerDisposable(
367 queuedOperations,
368 serverAPI.onMessageOfType('operationProgress', progress => {
369 switch (progress.kind) {
370 case 'queue':
371 case 'spawn': // spawning doubles as our notification to dequeue the next operation, and includes the new queue state.
372 // Update with the latest queue state. We expect this to be sent whenever we try to run a command but it gets queued.
373 writeAtom(queuedOperations, () => {
374 return progress.queue
375 .map(opId => operationsById.get(opId))
376 .filter((op): op is Operation => op != null);
377 });
378 // On spawn, we can clear the queued commands error. The error would have already been shown and then further acted on.
379 // This wouldn't happen automatically, so we consider this an explicit user acknowledgement.
380 // This also means this error state and the queuedOperations state should be mutually exclusive.
381 writeAtom(queuedOperationsErrorAtom, undefined);
382 break;
383 case 'error': {
384 saveQueuedOperationsOnError(progress.id, new Error(progress.error));
385
386 writeAtom(queuedOperations, []); // empty queue when a command hits an error
387 break;
388 }
389 case 'exit': {
390 setTimeout(() => {
391 // we don't need to care about this operation anymore after this tick,
392 // once all other callsites processing 'operationProgress' messages have run.
393 operationsById.delete(progress.id);
394 });
395 if (progress.exitCode != null && progress.exitCode !== 0) {
396 saveQueuedOperationsOnError(progress.id, new Error('command exited with non-zero code'));
397
398 // if any process in the queue exits with an error, the entire queue is cleared.
399 writeAtom(queuedOperations, []);
400 }
401 break;
402 }
403 }
404 }),
405 import.meta.hot,
406);
407
408function saveQueuedOperationsOnError(operationIdThatErrored: string, error: Error) {
409 const queued = readAtom(queuedOperations);
410 // This may be called twice for the same operation (error, then also exit).
411 // Don't clear the error state if it's for the same operation, even if the queue is now empty.
412 if (readAtom(queuedOperationsErrorAtom)?.operationThatErrored?.id !== operationIdThatErrored) {
413 writeAtom(
414 queuedOperationsErrorAtom,
415 queued.length === 0
416 ? undefined // invariant: queuedOperationsError.operations should never be [], rather the whole thing is undefined
417 : {
418 operationThatErrored: operationsById.get(operationIdThatErrored),
419 error,
420 operations: readAtom(queuedOperations),
421 },
422 );
423 }
424}
425
426export function getLatestOperationInfo(operation: Operation): OperationInfo | undefined {
427 const list = readAtom(operationList);
428 const info =
429 list.currentOperation?.operation === operation
430 ? list.currentOperation
431 : list.operationHistory.find(op => op.operation === operation);
432
433 return info;
434}
435
436function runOperationImpl(operation: Operation): Promise<undefined | Error> {
437 // TODO: check for hashes in arguments that are known to be obsolete already,
438 // and mark those to not be rewritten.
439 serverAPI.postMessage({
440 type: 'runOperation',
441 operation: operation.getRunnableOperation(),
442 });
443 const deferred = defer<undefined | Error>();
444 operationCompletionCallbacks.set(operation.id, (err?: Error) => {
445 deferred.resolve(err);
446 });
447
448 operationsById.set(operation.id, operation);
449 const ongoing = readAtom(operationList);
450
451 if (ongoing?.currentOperation != null && ongoing.currentOperation.exitCode == null) {
452 // Add to the queue optimistically. The server will tell us the real state of the queue when it gets our run request.
453 writeAtom(queuedOperations, prev => [...(prev || []), operation]);
454 } else {
455 // start a new operation. We need to manage the previous operations
456 writeAtom(operationList, list => startNewOperation(operation, list));
457 }
458
459 // Check periodically with the server that the process is still running.
460 // This is a fallback in case the server cannot send us "exit" messages.
461 // This timer will auto disable when currentOperation becomes null.
462 currentOperationHeartbeatTimer.enabled = true;
463
464 return deferred.promise;
465}
466
467const currentOperationHeartbeatTimer = new Timer(() => {
468 const currentOp = readAtom(operationList).currentOperation;
469 if (currentOp == null || currentOp.endTime != null) {
470 // Stop the timer.
471 return false;
472 }
473 maybeRemoveForgottenOperation();
474}, 5000);
475
476/**
477 * Returns callback to run an operation.
478 * Will be queued by the server if other operations are already running.
479 * This returns a promise that resolves when this operation has exited
480 * (though its optimistic state may not have finished resolving yet).
481 * Note: Most callsites won't await this promise, and just use queueing. If you do, you should probably use `throwOnError = true` to detect errors.
482 * TODO: should we refactor this into a separate function if you want to await the result, which always throws?
483 * Note: There's no need to wait for this promise to resolve before starting another operation,
484 * successive operations will queue up with a nicer UX than if you awaited each one.
485 */
486export function useRunOperation() {
487 return useCallback(async (operation: Operation, throwOnError?: boolean): Promise<void> => {
488 if (readAtom(applicationinfo)?.readOnly) {
489 return;
490 }
491 const result = await runOperationImpl(operation);
492 if (result != null && throwOnError) {
493 throw result;
494 }
495 }, []);
496}
497
498/**
499 * Returns callback to abort the running operation.
500 */
501export function useAbortRunningOperation() {
502 return useCallback((operationId: string) => {
503 serverAPI.postMessage({
504 type: 'abortRunningOperation',
505 operationId,
506 });
507 const ongoing = readAtom(operationList);
508 if (ongoing?.currentOperation?.operation?.id === operationId) {
509 // Mark 'aborting' as true.
510 writeAtom(operationList, list => {
511 const currentOperation = list.currentOperation;
512 if (currentOperation != null) {
513 return {...list, currentOperation: {aborting: true, ...currentOperation}};
514 }
515 return list;
516 });
517 }
518 }, []);
519}
520
521/**
522 * Returns callback to run the operation currently being previewed, or cancel the preview.
523 * Set operationBeingPreviewed to start a preview.
524 */
525export function useRunPreviewedOperation() {
526 return useCallback((isCancel: boolean, operation?: Operation) => {
527 if (isCancel) {
528 writeAtom(operationBeingPreviewed, undefined);
529 return;
530 }
531
532 const operationToRun = operation ?? readAtom(operationBeingPreviewed);
533 writeAtom(operationBeingPreviewed, undefined);
534 if (operationToRun) {
535 runOperationImpl(operationToRun);
536 }
537 }, []);
538}
539
540/**
541 * It's possible for optimistic state to be incorrect, e.g. if some assumption about a command is incorrect in an edge case
542 * but the command doesn't exit non-zero. This provides a backdoor to clear out all ongoing optimistic state from *previous* commands.
543 * Queued commands and the currently running command will not be affected.
544 */
545export function useClearAllOptimisticState() {
546 return useCallback(() => {
547 writeAtom(operationList, list => {
548 const operationHistory = [...list.operationHistory];
549 for (let i = 0; i < operationHistory.length; i++) {
550 if (operationHistory[i].exitCode != null) {
551 if (!operationHistory[i].hasCompletedOptimisticState) {
552 operationHistory[i] = {...operationHistory[i], hasCompletedOptimisticState: true};
553 }
554 if (!operationHistory[i].hasCompletedUncommittedChangesOptimisticState) {
555 operationHistory[i] = {
556 ...operationHistory[i],
557 hasCompletedUncommittedChangesOptimisticState: true,
558 };
559 }
560 if (!operationHistory[i].hasCompletedMergeConflictsOptimisticState) {
561 operationHistory[i] = {
562 ...operationHistory[i],
563 hasCompletedMergeConflictsOptimisticState: true,
564 };
565 }
566 }
567 }
568 const currentOperation =
569 list.currentOperation == null ? undefined : {...list.currentOperation};
570 if (currentOperation?.exitCode != null) {
571 currentOperation.hasCompletedOptimisticState = true;
572 currentOperation.hasCompletedUncommittedChangesOptimisticState = true;
573 currentOperation.hasCompletedMergeConflictsOptimisticState = true;
574 }
575 return {currentOperation, operationHistory};
576 });
577 }, []);
578}
579