addons/shared/RateLimiter.tsblame
View source
b69ab311/**
b69ab312 * Copyright (c) Meta Platforms, Inc. and affiliates.
b69ab313 *
b69ab314 * This source code is licensed under the MIT license found in the
b69ab315 * LICENSE file in the root directory of this source tree.
b69ab316 */
b69ab317
b69ab318import {TypedEventEmitter} from './TypedEventEmitter';
b69ab319
b69ab3110type Id = number;
b69ab3111
b69ab3112/**
b69ab3113 * Rate limits requests to run an arbitrary task.
b69ab3114 * Up to `maxSimultaneousRunning` tasks can run at once,
b69ab3115 * further requests will be queued and run when a running task finishes.
b69ab3116 *
b69ab3117 * Usage:
b69ab3118 * ```
b69ab3119 * const rateLimiter = new RateLimiter(5);
b69ab3120 * const result = await rateLimiter.enqueueRun(() => {
b69ab3121 * // ...do arbitrary async work...
b69ab3122 * });
b69ab3123 * ```
b69ab3124 */
b69ab3125export class RateLimiter {
b69ab3126 private queued: Array<Id> = [];
b69ab3127 private running: Array<Id> = [];
b69ab3128 private runs = new TypedEventEmitter<'run', Id>();
b69ab3129
b69ab3130 constructor(
b69ab3131 private maxSimultaneousRunning: number,
b69ab3132 private log?: (s: string) => unknown,
b69ab3133 ) {}
b69ab3134
b69ab3135 private nextId = 1;
b69ab3136 private generateId(): Id {
b69ab3137 return this.nextId++;
b69ab3138 }
b69ab3139
b69ab3140 async enqueueRun<T>(runner: () => Promise<T>): Promise<T> {
b69ab3141 const id = this.generateId();
b69ab3142
b69ab3143 this.queued.push(id);
b69ab3144 this.tryDequeueNext();
b69ab3145
b69ab3146 if (!this.running.includes(id)) {
b69ab3147 this.log?.(`${this.running.length} tasks are already running, enqueuing ID:${id}`);
b69ab3148 await new Promise(res => {
b69ab3149 this.runs.on('run', ran => {
b69ab3150 if (ran === id) {
b69ab3151 this.log?.(`now allowing ID:${id} to run`);
b69ab3152 res(undefined);
b69ab3153 }
b69ab3154 });
b69ab3155 });
b69ab3156 }
b69ab3157
b69ab3158 try {
b69ab3159 return await runner();
b69ab3160 } finally {
b69ab3161 this.notifyFinished(id);
b69ab3162 }
b69ab3163 }
b69ab3164
b69ab3165 private notifyFinished(id: Id): void {
b69ab3166 this.running = this.running.filter(running => running !== id);
b69ab3167 this.tryDequeueNext();
b69ab3168 }
b69ab3169
b69ab3170 private tryDequeueNext() {
b69ab3171 if (this.running.length < this.maxSimultaneousRunning) {
b69ab3172 const toRun = this.queued.shift();
b69ab3173 if (toRun != null) {
b69ab3174 this.run(toRun);
b69ab3175 }
b69ab3176 }
b69ab3177 }
b69ab3178
b69ab3179 private run(id: Id) {
b69ab3180 this.running.push(id);
b69ab3181 this.runs.emit('run', id);
b69ab3182 }
b69ab3183}