1.9 KB84 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import {TypedEventEmitter} from './TypedEventEmitter';
9
10type Id = number;
11
12/**
13 * Rate limits requests to run an arbitrary task.
14 * Up to `maxSimultaneousRunning` tasks can run at once,
15 * further requests will be queued and run when a running task finishes.
16 *
17 * Usage:
18 * ```
19 * const rateLimiter = new RateLimiter(5);
20 * const result = await rateLimiter.enqueueRun(() => {
21 * // ...do arbitrary async work...
22 * });
23 * ```
24 */
25export class RateLimiter {
26 private queued: Array<Id> = [];
27 private running: Array<Id> = [];
28 private runs = new TypedEventEmitter<'run', Id>();
29
30 constructor(
31 private maxSimultaneousRunning: number,
32 private log?: (s: string) => unknown,
33 ) {}
34
35 private nextId = 1;
36 private generateId(): Id {
37 return this.nextId++;
38 }
39
40 async enqueueRun<T>(runner: () => Promise<T>): Promise<T> {
41 const id = this.generateId();
42
43 this.queued.push(id);
44 this.tryDequeueNext();
45
46 if (!this.running.includes(id)) {
47 this.log?.(`${this.running.length} tasks are already running, enqueuing ID:${id}`);
48 await new Promise(res => {
49 this.runs.on('run', ran => {
50 if (ran === id) {
51 this.log?.(`now allowing ID:${id} to run`);
52 res(undefined);
53 }
54 });
55 });
56 }
57
58 try {
59 return await runner();
60 } finally {
61 this.notifyFinished(id);
62 }
63 }
64
65 private notifyFinished(id: Id): void {
66 this.running = this.running.filter(running => running !== id);
67 this.tryDequeueNext();
68 }
69
70 private tryDequeueNext() {
71 if (this.running.length < this.maxSimultaneousRunning) {
72 const toRun = this.queued.shift();
73 if (toRun != null) {
74 this.run(toRun);
75 }
76 }
77 }
78
79 private run(id: Id) {
80 this.running.push(id);
81 this.runs.emit('run', id);
82 }
83}
84