addons/isl-server/src/OperationQueue.tsblame
View source
b69ab311/**
b69ab312 * Copyright (c) Meta Platforms, Inc. and affiliates.
b69ab313 *
b69ab314 * This source code is licensed under the MIT license found in the
b69ab315 * LICENSE file in the root directory of this source tree.
b69ab316 */
b69ab317
b69ab318import type {
b69ab319 OperationCommandProgressReporter,
b69ab3110 OperationProgress,
b69ab3111 RunnableOperation,
b69ab3112} from 'isl/src/types';
b69ab3113import type {Deferred} from 'shared/utils';
b69ab3114import type {ServerSideTracker} from './analytics/serverSideTracker';
b69ab3115import type {RepositoryContext} from './serverTypes';
b69ab3116
b69ab3117import {clearTrackedCache} from 'shared/LRU';
b69ab3118import {newAbortController} from 'shared/compat';
b69ab3119import {defer} from 'shared/utils';
b69ab3120
b69ab3121/**
b69ab3122 * Handle running & queueing all Operations so that only one Operation runs at once.
b69ab3123 * Operations may be run by sl in the Repository or other providers like ghstack in the RemoteRepository.
b69ab3124 */
b69ab3125export class OperationQueue {
b69ab3126 constructor(
b69ab3127 private runCallback: (
b69ab3128 ctx: RepositoryContext,
b69ab3129 operation: RunnableOperation,
b69ab3130 handleProgress: OperationCommandProgressReporter,
b69ab3131 signal: AbortSignal,
b69ab3132 ) => Promise<unknown>,
b69ab3133 ) {}
b69ab3134
b69ab3135 private queuedOperations: Array<RunnableOperation & {tracker: ServerSideTracker}> = [];
b69ab3136 private runningOperation: RunnableOperation | undefined = undefined;
b69ab3137 private runningOperationStartTime: Date | undefined = undefined;
b69ab3138 private abortController: AbortController | undefined = undefined;
b69ab3139 private deferredOperations = new Map<string, Deferred<'ran' | 'skipped'>>();
b69ab3140
b69ab3141 /**
b69ab3142 * Run an operation, or if one is already running, add it to the queue.
b69ab3143 * Promise resolves with:
b69ab3144 * - 'ran', when the operation exits (no matter success/failure), even if it was enqueued.
b69ab3145 * - 'skipped', when the operation is never going to be run, since an earlier queued command errored.
b69ab3146 */
b69ab3147 async runOrQueueOperation(
b69ab3148 ctx: RepositoryContext,
b69ab3149 operation: RunnableOperation,
b69ab3150 onProgress: (progress: OperationProgress) => void,
b69ab3151 ): Promise<'ran' | 'skipped'> {
b69ab3152 const {tracker, logger} = ctx;
b69ab3153 if (this.runningOperation != null) {
b69ab3154 this.queuedOperations.push({...operation, tracker});
b69ab3155 const deferred = defer<'ran' | 'skipped'>();
b69ab3156 this.deferredOperations.set(operation.id, deferred);
b69ab3157 onProgress({id: operation.id, kind: 'queue', queue: this.queuedOperations.map(o => o.id)});
b69ab3158
b69ab3159 logger.log('queued operation:', operation.args.join(' '));
b69ab3160 return deferred.promise;
b69ab3161 }
b69ab3162 this.runningOperation = operation;
b69ab3163 this.runningOperationStartTime = new Date();
b69ab3164
b69ab3165 const handleCommandProgress: OperationCommandProgressReporter = (...args) => {
b69ab3166 switch (args[0]) {
b69ab3167 case 'spawn':
b69ab3168 onProgress({
b69ab3169 id: operation.id,
b69ab3170 kind: 'spawn',
b69ab3171 queue: this.queuedOperations.map(op => op.id),
b69ab3172 });
b69ab3173 break;
b69ab3174 case 'stdout':
b69ab3175 onProgress({id: operation.id, kind: 'stdout', message: args[1]});
b69ab3176 break;
b69ab3177 case 'progress':
b69ab3178 onProgress({id: operation.id, kind: 'progress', progress: args[1]});
b69ab3179 break;
b69ab3180 case 'inlineProgress':
b69ab3181 onProgress({id: operation.id, kind: 'inlineProgress', hash: args[1], message: args[2]});
b69ab3182 break;
b69ab3183 case 'warning':
b69ab3184 onProgress({id: operation.id, kind: 'warning', warning: args[1]});
b69ab3185 break;
b69ab3186 case 'stderr':
b69ab3187 onProgress({id: operation.id, kind: 'stderr', message: args[1]});
b69ab3188 break;
b69ab3189 case 'exit':
b69ab3190 onProgress({id: operation.id, kind: 'exit', exitCode: args[1], timestamp: Date.now()});
b69ab3191 break;
b69ab3192 }
b69ab3193 };
b69ab3194
b69ab3195 try {
b69ab3196 const controller = newAbortController();
b69ab3197 this.abortController = controller;
b69ab3198 await tracker.operation(
b69ab3199 operation.trackEventName,
b69ab31100 'RunOperationError',
b69ab31101 {extras: {args: operation.args, runner: operation.runner}, operationId: operation.id},
b69ab31102 _p => this.runCallback(ctx, operation, handleCommandProgress, controller.signal),
b69ab31103 );
b69ab31104 } catch (err) {
b69ab31105 const errString = (err as Error).toString();
b69ab31106 logger.log('error running operation: ', operation.args[0], errString);
b69ab31107 onProgress({id: operation.id, kind: 'error', error: errString});
b69ab31108
b69ab31109 if (this.queuedOperations.length > 0) {
b69ab31110 logger.log('the above error removed queued operations:');
b69ab31111 // clear queue to run when we hit an error,
b69ab31112 // which also requires resolving all their promises
b69ab31113 for (const queued of this.queuedOperations) {
b69ab31114 logger.log(' > ', queued.args.join(' ')); // print out commands that are skipped, in case it's useful to debug or recover data
b69ab31115 this.resolveDeferredPromise(queued.id, 'skipped');
b69ab31116 }
b69ab31117 }
b69ab31118 this.queuedOperations = [];
b69ab31119 } finally {
b69ab31120 this.runningOperationStartTime = undefined;
b69ab31121 this.runningOperation = undefined;
b69ab31122
b69ab31123 // resolve original enqueuer's promise
b69ab31124 this.resolveDeferredPromise(operation.id, 'ran');
b69ab31125 }
b69ab31126
b69ab31127 // now that we successfully ran this operation, dequeue the next
b69ab31128 if (this.queuedOperations.length > 0) {
b69ab31129 const op = this.queuedOperations.shift();
b69ab31130 if (op != null) {
b69ab31131 // don't await this, the caller should resolve when the original operation finishes.
b69ab31132 this.runOrQueueOperation(
b69ab31133 ctx,
b69ab31134 op,
b69ab31135 // TODO: we're using the onProgress from the LAST `runOperation`... should we be keeping the newer onProgress in the queued operation?
b69ab31136 onProgress,
b69ab31137 );
b69ab31138 }
b69ab31139 } else {
b69ab31140 // Attempt to free some memory.
b69ab31141 clearTrackedCache();
b69ab31142 }
b69ab31143
b69ab31144 return 'ran';
b69ab31145 }
b69ab31146
b69ab31147 private resolveDeferredPromise(id: string, kind: 'ran' | 'skipped') {
b69ab31148 const found = this.deferredOperations.get(id);
b69ab31149 if (found != null) {
b69ab31150 found.resolve(kind);
b69ab31151 this.deferredOperations.delete(id);
b69ab31152 }
b69ab31153 }
b69ab31154
b69ab31155 /**
b69ab31156 * Get the running operation start time.
b69ab31157 * Returns `undefined` if there is no running operation.
b69ab31158 */
b69ab31159 getRunningOperationStartTime(): Date | undefined {
b69ab31160 if (this.runningOperation == null) {
b69ab31161 return undefined;
b69ab31162 }
b69ab31163 return this.runningOperationStartTime;
b69ab31164 }
b69ab31165
b69ab31166 /**
b69ab31167 * Send kill signal to the running operation if the operationId matches.
b69ab31168 * If the process exits, the exit event will be noticed by the queue.
b69ab31169 * This function does not block on waiting for the operation process to exit.
b69ab31170 */
b69ab31171 abortRunningOperation(operationId: string) {
b69ab31172 if (this.runningOperation?.id == operationId) {
b69ab31173 this.abortController?.abort();
b69ab31174 }
b69ab31175 }
b69ab31176
b69ab31177 /** The currently running operation. */
b69ab31178 getRunningOperation(): RunnableOperation | undefined {
b69ab31179 return this.runningOperation;
b69ab31180 }
b69ab31181}