6.4 KB182 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import type {
9 OperationCommandProgressReporter,
10 OperationProgress,
11 RunnableOperation,
12} from 'isl/src/types';
13import type {Deferred} from 'shared/utils';
14import type {ServerSideTracker} from './analytics/serverSideTracker';
15import type {RepositoryContext} from './serverTypes';
16
17import {clearTrackedCache} from 'shared/LRU';
18import {newAbortController} from 'shared/compat';
19import {defer} from 'shared/utils';
20
21/**
22 * Handle running & queueing all Operations so that only one Operation runs at once.
23 * Operations may be run by sl in the Repository or other providers like ghstack in the RemoteRepository.
24 */
25export class OperationQueue {
26 constructor(
27 private runCallback: (
28 ctx: RepositoryContext,
29 operation: RunnableOperation,
30 handleProgress: OperationCommandProgressReporter,
31 signal: AbortSignal,
32 ) => Promise<unknown>,
33 ) {}
34
35 private queuedOperations: Array<RunnableOperation & {tracker: ServerSideTracker}> = [];
36 private runningOperation: RunnableOperation | undefined = undefined;
37 private runningOperationStartTime: Date | undefined = undefined;
38 private abortController: AbortController | undefined = undefined;
39 private deferredOperations = new Map<string, Deferred<'ran' | 'skipped'>>();
40
41 /**
42 * Run an operation, or if one is already running, add it to the queue.
43 * Promise resolves with:
44 * - 'ran', when the operation exits (no matter success/failure), even if it was enqueued.
45 * - 'skipped', when the operation is never going to be run, since an earlier queued command errored.
46 */
47 async runOrQueueOperation(
48 ctx: RepositoryContext,
49 operation: RunnableOperation,
50 onProgress: (progress: OperationProgress) => void,
51 ): Promise<'ran' | 'skipped'> {
52 const {tracker, logger} = ctx;
53 if (this.runningOperation != null) {
54 this.queuedOperations.push({...operation, tracker});
55 const deferred = defer<'ran' | 'skipped'>();
56 this.deferredOperations.set(operation.id, deferred);
57 onProgress({id: operation.id, kind: 'queue', queue: this.queuedOperations.map(o => o.id)});
58
59 logger.log('queued operation:', operation.args.join(' '));
60 return deferred.promise;
61 }
62 this.runningOperation = operation;
63 this.runningOperationStartTime = new Date();
64
65 const handleCommandProgress: OperationCommandProgressReporter = (...args) => {
66 switch (args[0]) {
67 case 'spawn':
68 onProgress({
69 id: operation.id,
70 kind: 'spawn',
71 queue: this.queuedOperations.map(op => op.id),
72 });
73 break;
74 case 'stdout':
75 onProgress({id: operation.id, kind: 'stdout', message: args[1]});
76 break;
77 case 'progress':
78 onProgress({id: operation.id, kind: 'progress', progress: args[1]});
79 break;
80 case 'inlineProgress':
81 onProgress({id: operation.id, kind: 'inlineProgress', hash: args[1], message: args[2]});
82 break;
83 case 'warning':
84 onProgress({id: operation.id, kind: 'warning', warning: args[1]});
85 break;
86 case 'stderr':
87 onProgress({id: operation.id, kind: 'stderr', message: args[1]});
88 break;
89 case 'exit':
90 onProgress({id: operation.id, kind: 'exit', exitCode: args[1], timestamp: Date.now()});
91 break;
92 }
93 };
94
95 try {
96 const controller = newAbortController();
97 this.abortController = controller;
98 await tracker.operation(
99 operation.trackEventName,
100 'RunOperationError',
101 {extras: {args: operation.args, runner: operation.runner}, operationId: operation.id},
102 _p => this.runCallback(ctx, operation, handleCommandProgress, controller.signal),
103 );
104 } catch (err) {
105 const errString = (err as Error).toString();
106 logger.log('error running operation: ', operation.args[0], errString);
107 onProgress({id: operation.id, kind: 'error', error: errString});
108
109 if (this.queuedOperations.length > 0) {
110 logger.log('the above error removed queued operations:');
111 // clear queue to run when we hit an error,
112 // which also requires resolving all their promises
113 for (const queued of this.queuedOperations) {
114 logger.log(' > ', queued.args.join(' ')); // print out commands that are skipped, in case it's useful to debug or recover data
115 this.resolveDeferredPromise(queued.id, 'skipped');
116 }
117 }
118 this.queuedOperations = [];
119 } finally {
120 this.runningOperationStartTime = undefined;
121 this.runningOperation = undefined;
122
123 // resolve original enqueuer's promise
124 this.resolveDeferredPromise(operation.id, 'ran');
125 }
126
127 // now that we successfully ran this operation, dequeue the next
128 if (this.queuedOperations.length > 0) {
129 const op = this.queuedOperations.shift();
130 if (op != null) {
131 // don't await this, the caller should resolve when the original operation finishes.
132 this.runOrQueueOperation(
133 ctx,
134 op,
135 // TODO: we're using the onProgress from the LAST `runOperation`... should we be keeping the newer onProgress in the queued operation?
136 onProgress,
137 );
138 }
139 } else {
140 // Attempt to free some memory.
141 clearTrackedCache();
142 }
143
144 return 'ran';
145 }
146
147 private resolveDeferredPromise(id: string, kind: 'ran' | 'skipped') {
148 const found = this.deferredOperations.get(id);
149 if (found != null) {
150 found.resolve(kind);
151 this.deferredOperations.delete(id);
152 }
153 }
154
155 /**
156 * Get the running operation start time.
157 * Returns `undefined` if there is no running operation.
158 */
159 getRunningOperationStartTime(): Date | undefined {
160 if (this.runningOperation == null) {
161 return undefined;
162 }
163 return this.runningOperationStartTime;
164 }
165
166 /**
167 * Send kill signal to the running operation if the operationId matches.
168 * If the process exits, the exit event will be noticed by the queue.
169 * This function does not block on waiting for the operation process to exit.
170 */
171 abortRunningOperation(operationId: string) {
172 if (this.runningOperation?.id == operationId) {
173 this.abortController?.abort();
174 }
175 }
176
177 /** The currently running operation. */
178 getRunningOperation(): RunnableOperation | undefined {
179 return this.runningOperation;
180 }
181}
182