| b69ab31 | | | 1 | /** |
| b69ab31 | | | 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| b69ab31 | | | 3 | * |
| b69ab31 | | | 4 | * This source code is licensed under the MIT license found in the |
| b69ab31 | | | 5 | * LICENSE file in the root directory of this source tree. |
| b69ab31 | | | 6 | */ |
| b69ab31 | | | 7 | |
| b69ab31 | | | 8 | import type { |
| b69ab31 | | | 9 | OperationCommandProgressReporter, |
| b69ab31 | | | 10 | OperationProgress, |
| b69ab31 | | | 11 | RunnableOperation, |
| b69ab31 | | | 12 | } from 'isl/src/types'; |
| b69ab31 | | | 13 | import type {Deferred} from 'shared/utils'; |
| b69ab31 | | | 14 | import type {ServerSideTracker} from './analytics/serverSideTracker'; |
| b69ab31 | | | 15 | import type {RepositoryContext} from './serverTypes'; |
| b69ab31 | | | 16 | |
| b69ab31 | | | 17 | import {clearTrackedCache} from 'shared/LRU'; |
| b69ab31 | | | 18 | import {newAbortController} from 'shared/compat'; |
| b69ab31 | | | 19 | import {defer} from 'shared/utils'; |
| b69ab31 | | | 20 | |
| b69ab31 | | | 21 | /** |
| b69ab31 | | | 22 | * Handle running & queueing all Operations so that only one Operation runs at once. |
| b69ab31 | | | 23 | * Operations may be run by sl in the Repository or other providers like ghstack in the RemoteRepository. |
| b69ab31 | | | 24 | */ |
| b69ab31 | | | 25 | export class OperationQueue { |
| b69ab31 | | | 26 | constructor( |
| b69ab31 | | | 27 | private runCallback: ( |
| b69ab31 | | | 28 | ctx: RepositoryContext, |
| b69ab31 | | | 29 | operation: RunnableOperation, |
| b69ab31 | | | 30 | handleProgress: OperationCommandProgressReporter, |
| b69ab31 | | | 31 | signal: AbortSignal, |
| b69ab31 | | | 32 | ) => Promise<unknown>, |
| b69ab31 | | | 33 | ) {} |
| b69ab31 | | | 34 | |
| b69ab31 | | | 35 | private queuedOperations: Array<RunnableOperation & {tracker: ServerSideTracker}> = []; |
| b69ab31 | | | 36 | private runningOperation: RunnableOperation | undefined = undefined; |
| b69ab31 | | | 37 | private runningOperationStartTime: Date | undefined = undefined; |
| b69ab31 | | | 38 | private abortController: AbortController | undefined = undefined; |
| b69ab31 | | | 39 | private deferredOperations = new Map<string, Deferred<'ran' | 'skipped'>>(); |
| b69ab31 | | | 40 | |
| b69ab31 | | | 41 | /** |
| b69ab31 | | | 42 | * Run an operation, or if one is already running, add it to the queue. |
| b69ab31 | | | 43 | * Promise resolves with: |
| b69ab31 | | | 44 | * - 'ran', when the operation exits (no matter success/failure), even if it was enqueued. |
| b69ab31 | | | 45 | * - 'skipped', when the operation is never going to be run, since an earlier queued command errored. |
| b69ab31 | | | 46 | */ |
| b69ab31 | | | 47 | async runOrQueueOperation( |
| b69ab31 | | | 48 | ctx: RepositoryContext, |
| b69ab31 | | | 49 | operation: RunnableOperation, |
| b69ab31 | | | 50 | onProgress: (progress: OperationProgress) => void, |
| b69ab31 | | | 51 | ): Promise<'ran' | 'skipped'> { |
| b69ab31 | | | 52 | const {tracker, logger} = ctx; |
| b69ab31 | | | 53 | if (this.runningOperation != null) { |
| b69ab31 | | | 54 | this.queuedOperations.push({...operation, tracker}); |
| b69ab31 | | | 55 | const deferred = defer<'ran' | 'skipped'>(); |
| b69ab31 | | | 56 | this.deferredOperations.set(operation.id, deferred); |
| b69ab31 | | | 57 | onProgress({id: operation.id, kind: 'queue', queue: this.queuedOperations.map(o => o.id)}); |
| b69ab31 | | | 58 | |
| b69ab31 | | | 59 | logger.log('queued operation:', operation.args.join(' ')); |
| b69ab31 | | | 60 | return deferred.promise; |
| b69ab31 | | | 61 | } |
| b69ab31 | | | 62 | this.runningOperation = operation; |
| b69ab31 | | | 63 | this.runningOperationStartTime = new Date(); |
| b69ab31 | | | 64 | |
| b69ab31 | | | 65 | const handleCommandProgress: OperationCommandProgressReporter = (...args) => { |
| b69ab31 | | | 66 | switch (args[0]) { |
| b69ab31 | | | 67 | case 'spawn': |
| b69ab31 | | | 68 | onProgress({ |
| b69ab31 | | | 69 | id: operation.id, |
| b69ab31 | | | 70 | kind: 'spawn', |
| b69ab31 | | | 71 | queue: this.queuedOperations.map(op => op.id), |
| b69ab31 | | | 72 | }); |
| b69ab31 | | | 73 | break; |
| b69ab31 | | | 74 | case 'stdout': |
| b69ab31 | | | 75 | onProgress({id: operation.id, kind: 'stdout', message: args[1]}); |
| b69ab31 | | | 76 | break; |
| b69ab31 | | | 77 | case 'progress': |
| b69ab31 | | | 78 | onProgress({id: operation.id, kind: 'progress', progress: args[1]}); |
| b69ab31 | | | 79 | break; |
| b69ab31 | | | 80 | case 'inlineProgress': |
| b69ab31 | | | 81 | onProgress({id: operation.id, kind: 'inlineProgress', hash: args[1], message: args[2]}); |
| b69ab31 | | | 82 | break; |
| b69ab31 | | | 83 | case 'warning': |
| b69ab31 | | | 84 | onProgress({id: operation.id, kind: 'warning', warning: args[1]}); |
| b69ab31 | | | 85 | break; |
| b69ab31 | | | 86 | case 'stderr': |
| b69ab31 | | | 87 | onProgress({id: operation.id, kind: 'stderr', message: args[1]}); |
| b69ab31 | | | 88 | break; |
| b69ab31 | | | 89 | case 'exit': |
| b69ab31 | | | 90 | onProgress({id: operation.id, kind: 'exit', exitCode: args[1], timestamp: Date.now()}); |
| b69ab31 | | | 91 | break; |
| b69ab31 | | | 92 | } |
| b69ab31 | | | 93 | }; |
| b69ab31 | | | 94 | |
| b69ab31 | | | 95 | try { |
| b69ab31 | | | 96 | const controller = newAbortController(); |
| b69ab31 | | | 97 | this.abortController = controller; |
| b69ab31 | | | 98 | await tracker.operation( |
| b69ab31 | | | 99 | operation.trackEventName, |
| b69ab31 | | | 100 | 'RunOperationError', |
| b69ab31 | | | 101 | {extras: {args: operation.args, runner: operation.runner}, operationId: operation.id}, |
| b69ab31 | | | 102 | _p => this.runCallback(ctx, operation, handleCommandProgress, controller.signal), |
| b69ab31 | | | 103 | ); |
| b69ab31 | | | 104 | } catch (err) { |
| b69ab31 | | | 105 | const errString = (err as Error).toString(); |
| b69ab31 | | | 106 | logger.log('error running operation: ', operation.args[0], errString); |
| b69ab31 | | | 107 | onProgress({id: operation.id, kind: 'error', error: errString}); |
| b69ab31 | | | 108 | |
| b69ab31 | | | 109 | if (this.queuedOperations.length > 0) { |
| b69ab31 | | | 110 | logger.log('the above error removed queued operations:'); |
| b69ab31 | | | 111 | // clear queue to run when we hit an error, |
| b69ab31 | | | 112 | // which also requires resolving all their promises |
| b69ab31 | | | 113 | for (const queued of this.queuedOperations) { |
| b69ab31 | | | 114 | logger.log(' > ', queued.args.join(' ')); // print out commands that are skipped, in case it's useful to debug or recover data |
| b69ab31 | | | 115 | this.resolveDeferredPromise(queued.id, 'skipped'); |
| b69ab31 | | | 116 | } |
| b69ab31 | | | 117 | } |
| b69ab31 | | | 118 | this.queuedOperations = []; |
| b69ab31 | | | 119 | } finally { |
| b69ab31 | | | 120 | this.runningOperationStartTime = undefined; |
| b69ab31 | | | 121 | this.runningOperation = undefined; |
| b69ab31 | | | 122 | |
| b69ab31 | | | 123 | // resolve original enqueuer's promise |
| b69ab31 | | | 124 | this.resolveDeferredPromise(operation.id, 'ran'); |
| b69ab31 | | | 125 | } |
| b69ab31 | | | 126 | |
| b69ab31 | | | 127 | // now that we successfully ran this operation, dequeue the next |
| b69ab31 | | | 128 | if (this.queuedOperations.length > 0) { |
| b69ab31 | | | 129 | const op = this.queuedOperations.shift(); |
| b69ab31 | | | 130 | if (op != null) { |
| b69ab31 | | | 131 | // don't await this, the caller should resolve when the original operation finishes. |
| b69ab31 | | | 132 | this.runOrQueueOperation( |
| b69ab31 | | | 133 | ctx, |
| b69ab31 | | | 134 | op, |
| b69ab31 | | | 135 | // TODO: we're using the onProgress from the LAST `runOperation`... should we be keeping the newer onProgress in the queued operation? |
| b69ab31 | | | 136 | onProgress, |
| b69ab31 | | | 137 | ); |
| b69ab31 | | | 138 | } |
| b69ab31 | | | 139 | } else { |
| b69ab31 | | | 140 | // Attempt to free some memory. |
| b69ab31 | | | 141 | clearTrackedCache(); |
| b69ab31 | | | 142 | } |
| b69ab31 | | | 143 | |
| b69ab31 | | | 144 | return 'ran'; |
| b69ab31 | | | 145 | } |
| b69ab31 | | | 146 | |
| b69ab31 | | | 147 | private resolveDeferredPromise(id: string, kind: 'ran' | 'skipped') { |
| b69ab31 | | | 148 | const found = this.deferredOperations.get(id); |
| b69ab31 | | | 149 | if (found != null) { |
| b69ab31 | | | 150 | found.resolve(kind); |
| b69ab31 | | | 151 | this.deferredOperations.delete(id); |
| b69ab31 | | | 152 | } |
| b69ab31 | | | 153 | } |
| b69ab31 | | | 154 | |
| b69ab31 | | | 155 | /** |
| b69ab31 | | | 156 | * Get the running operation start time. |
| b69ab31 | | | 157 | * Returns `undefined` if there is no running operation. |
| b69ab31 | | | 158 | */ |
| b69ab31 | | | 159 | getRunningOperationStartTime(): Date | undefined { |
| b69ab31 | | | 160 | if (this.runningOperation == null) { |
| b69ab31 | | | 161 | return undefined; |
| b69ab31 | | | 162 | } |
| b69ab31 | | | 163 | return this.runningOperationStartTime; |
| b69ab31 | | | 164 | } |
| b69ab31 | | | 165 | |
| b69ab31 | | | 166 | /** |
| b69ab31 | | | 167 | * Send kill signal to the running operation if the operationId matches. |
| b69ab31 | | | 168 | * If the process exits, the exit event will be noticed by the queue. |
| b69ab31 | | | 169 | * This function does not block on waiting for the operation process to exit. |
| b69ab31 | | | 170 | */ |
| b69ab31 | | | 171 | abortRunningOperation(operationId: string) { |
| b69ab31 | | | 172 | if (this.runningOperation?.id == operationId) { |
| b69ab31 | | | 173 | this.abortController?.abort(); |
| b69ab31 | | | 174 | } |
| b69ab31 | | | 175 | } |
| b69ab31 | | | 176 | |
| b69ab31 | | | 177 | /** The currently running operation. */ |
| b69ab31 | | | 178 | getRunningOperation(): RunnableOperation | undefined { |
| b69ab31 | | | 179 | return this.runningOperation; |
| b69ab31 | | | 180 | } |
| b69ab31 | | | 181 | } |