| 1 | /** |
| 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | * |
| 4 | * This source code is licensed under the MIT license found in the |
| 5 | * LICENSE file in the root directory of this source tree. |
| 6 | */ |
| 7 | |
| 8 | import type { |
| 9 | OperationCommandProgressReporter, |
| 10 | OperationProgress, |
| 11 | RunnableOperation, |
| 12 | } from 'isl/src/types'; |
| 13 | import type {Deferred} from 'shared/utils'; |
| 14 | import type {ServerSideTracker} from './analytics/serverSideTracker'; |
| 15 | import type {RepositoryContext} from './serverTypes'; |
| 16 | |
| 17 | import {clearTrackedCache} from 'shared/LRU'; |
| 18 | import {newAbortController} from 'shared/compat'; |
| 19 | import {defer} from 'shared/utils'; |
| 20 | |
| 21 | /** |
| 22 | * Handle running & queueing all Operations so that only one Operation runs at once. |
| 23 | * Operations may be run by sl in the Repository or other providers like ghstack in the RemoteRepository. |
| 24 | */ |
| 25 | export class OperationQueue { |
| 26 | constructor( |
| 27 | private runCallback: ( |
| 28 | ctx: RepositoryContext, |
| 29 | operation: RunnableOperation, |
| 30 | handleProgress: OperationCommandProgressReporter, |
| 31 | signal: AbortSignal, |
| 32 | ) => Promise<unknown>, |
| 33 | ) {} |
| 34 | |
| 35 | private queuedOperations: Array<RunnableOperation & {tracker: ServerSideTracker}> = []; |
| 36 | private runningOperation: RunnableOperation | undefined = undefined; |
| 37 | private runningOperationStartTime: Date | undefined = undefined; |
| 38 | private abortController: AbortController | undefined = undefined; |
| 39 | private deferredOperations = new Map<string, Deferred<'ran' | 'skipped'>>(); |
| 40 | |
| 41 | /** |
| 42 | * Run an operation, or if one is already running, add it to the queue. |
| 43 | * Promise resolves with: |
| 44 | * - 'ran', when the operation exits (no matter success/failure), even if it was enqueued. |
| 45 | * - 'skipped', when the operation is never going to be run, since an earlier queued command errored. |
| 46 | */ |
| 47 | async runOrQueueOperation( |
| 48 | ctx: RepositoryContext, |
| 49 | operation: RunnableOperation, |
| 50 | onProgress: (progress: OperationProgress) => void, |
| 51 | ): Promise<'ran' | 'skipped'> { |
| 52 | const {tracker, logger} = ctx; |
| 53 | if (this.runningOperation != null) { |
| 54 | this.queuedOperations.push({...operation, tracker}); |
| 55 | const deferred = defer<'ran' | 'skipped'>(); |
| 56 | this.deferredOperations.set(operation.id, deferred); |
| 57 | onProgress({id: operation.id, kind: 'queue', queue: this.queuedOperations.map(o => o.id)}); |
| 58 | |
| 59 | logger.log('queued operation:', operation.args.join(' ')); |
| 60 | return deferred.promise; |
| 61 | } |
| 62 | this.runningOperation = operation; |
| 63 | this.runningOperationStartTime = new Date(); |
| 64 | |
| 65 | const handleCommandProgress: OperationCommandProgressReporter = (...args) => { |
| 66 | switch (args[0]) { |
| 67 | case 'spawn': |
| 68 | onProgress({ |
| 69 | id: operation.id, |
| 70 | kind: 'spawn', |
| 71 | queue: this.queuedOperations.map(op => op.id), |
| 72 | }); |
| 73 | break; |
| 74 | case 'stdout': |
| 75 | onProgress({id: operation.id, kind: 'stdout', message: args[1]}); |
| 76 | break; |
| 77 | case 'progress': |
| 78 | onProgress({id: operation.id, kind: 'progress', progress: args[1]}); |
| 79 | break; |
| 80 | case 'inlineProgress': |
| 81 | onProgress({id: operation.id, kind: 'inlineProgress', hash: args[1], message: args[2]}); |
| 82 | break; |
| 83 | case 'warning': |
| 84 | onProgress({id: operation.id, kind: 'warning', warning: args[1]}); |
| 85 | break; |
| 86 | case 'stderr': |
| 87 | onProgress({id: operation.id, kind: 'stderr', message: args[1]}); |
| 88 | break; |
| 89 | case 'exit': |
| 90 | onProgress({id: operation.id, kind: 'exit', exitCode: args[1], timestamp: Date.now()}); |
| 91 | break; |
| 92 | } |
| 93 | }; |
| 94 | |
| 95 | try { |
| 96 | const controller = newAbortController(); |
| 97 | this.abortController = controller; |
| 98 | await tracker.operation( |
| 99 | operation.trackEventName, |
| 100 | 'RunOperationError', |
| 101 | {extras: {args: operation.args, runner: operation.runner}, operationId: operation.id}, |
| 102 | _p => this.runCallback(ctx, operation, handleCommandProgress, controller.signal), |
| 103 | ); |
| 104 | } catch (err) { |
| 105 | const errString = (err as Error).toString(); |
| 106 | logger.log('error running operation: ', operation.args[0], errString); |
| 107 | onProgress({id: operation.id, kind: 'error', error: errString}); |
| 108 | |
| 109 | if (this.queuedOperations.length > 0) { |
| 110 | logger.log('the above error removed queued operations:'); |
| 111 | // clear queue to run when we hit an error, |
| 112 | // which also requires resolving all their promises |
| 113 | for (const queued of this.queuedOperations) { |
| 114 | logger.log(' > ', queued.args.join(' ')); // print out commands that are skipped, in case it's useful to debug or recover data |
| 115 | this.resolveDeferredPromise(queued.id, 'skipped'); |
| 116 | } |
| 117 | } |
| 118 | this.queuedOperations = []; |
| 119 | } finally { |
| 120 | this.runningOperationStartTime = undefined; |
| 121 | this.runningOperation = undefined; |
| 122 | |
| 123 | // resolve original enqueuer's promise |
| 124 | this.resolveDeferredPromise(operation.id, 'ran'); |
| 125 | } |
| 126 | |
| 127 | // now that we successfully ran this operation, dequeue the next |
| 128 | if (this.queuedOperations.length > 0) { |
| 129 | const op = this.queuedOperations.shift(); |
| 130 | if (op != null) { |
| 131 | // don't await this, the caller should resolve when the original operation finishes. |
| 132 | this.runOrQueueOperation( |
| 133 | ctx, |
| 134 | op, |
| 135 | // TODO: we're using the onProgress from the LAST `runOperation`... should we be keeping the newer onProgress in the queued operation? |
| 136 | onProgress, |
| 137 | ); |
| 138 | } |
| 139 | } else { |
| 140 | // Attempt to free some memory. |
| 141 | clearTrackedCache(); |
| 142 | } |
| 143 | |
| 144 | return 'ran'; |
| 145 | } |
| 146 | |
| 147 | private resolveDeferredPromise(id: string, kind: 'ran' | 'skipped') { |
| 148 | const found = this.deferredOperations.get(id); |
| 149 | if (found != null) { |
| 150 | found.resolve(kind); |
| 151 | this.deferredOperations.delete(id); |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | /** |
| 156 | * Get the running operation start time. |
| 157 | * Returns `undefined` if there is no running operation. |
| 158 | */ |
| 159 | getRunningOperationStartTime(): Date | undefined { |
| 160 | if (this.runningOperation == null) { |
| 161 | return undefined; |
| 162 | } |
| 163 | return this.runningOperationStartTime; |
| 164 | } |
| 165 | |
| 166 | /** |
| 167 | * Send kill signal to the running operation if the operationId matches. |
| 168 | * If the process exits, the exit event will be noticed by the queue. |
| 169 | * This function does not block on waiting for the operation process to exit. |
| 170 | */ |
| 171 | abortRunningOperation(operationId: string) { |
| 172 | if (this.runningOperation?.id == operationId) { |
| 173 | this.abortController?.abort(); |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | /** The currently running operation. */ |
| 178 | getRunningOperation(): RunnableOperation | undefined { |
| 179 | return this.runningOperation; |
| 180 | } |
| 181 | } |
| 182 | |