5.9 KB178 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import type {Disposable, MessageBusStatus, PlatformName} from './types';
9
10import {CLOSED_AND_SHOULD_NOT_RECONNECT_CODE} from 'isl-server/src/constants';
11import {logger} from './logger';
12
13export class LocalWebSocketEventBus {
14 static MAX_RECONNECT_CHECK_TIME_MS = 60000;
15 static DEFAULT_RECONNECT_CHECK_TIME_MS = 100;
16
17 private websocket: WebSocket;
18 private status: MessageBusStatus = {type: 'initializing'};
19 private exponentialReconnectDelay = LocalWebSocketEventBus.DEFAULT_RECONNECT_CHECK_TIME_MS;
20 private queuedMessages: Array<string | ArrayBuffer> = [];
21
22 // A sub-state of "status", used by `startConnection` to avoid creating multiple
23 // websockets while connecting.
24 //
25 // status.type: | 'initializing' | 'open' | 'reconnecting' | 'open'
26 // opening: | true | false | false | true | false
27 // ^^^^^^^ reconnect setTimeout
28 private opening = false;
29
30 private handlers: Array<(event: MessageEvent<string>) => void | Promise<void>> = [];
31 private statusChangeHandlers: Array<(newStatus: MessageBusStatus) => unknown> = [];
32
33 private disposed = false;
34
35 /**
36 * @param host to use when creating the Web Socket to talk to the server. Should
37 * include the hostname and optionally, a port, e.g., "localhost:3001" or "example.com".
38 */
39 constructor(
40 private host: string,
41 private WebSocketType: typeof WebSocket,
42 private params: {token?: string; cwd?: string; sessionId?: string; platformName: PlatformName},
43 ) {
44 // startConnection already assigns to websocket, but we do it here so typescript knows websocket is always defined
45 this.websocket = this.startConnection();
46 }
47
48 public dispose() {
49 if (this.disposed) {
50 return;
51 }
52 this.disposed = true;
53 this.websocket.close();
54 }
55
56 private startConnection(): WebSocket {
57 if (this.disposed || this.opening || this.status.type === 'open') {
58 return this.websocket;
59 }
60 const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
61 const wsUrl = new URL(`${wsProtocol}//${this.host}/ws`);
62 const token = this.params.token;
63 if (token) {
64 wsUrl.searchParams.append('token', token);
65 }
66 const cwdParam = this.params.cwd;
67 if (cwdParam) {
68 const cwd = decodeURIComponent(cwdParam);
69 wsUrl.searchParams.append('cwd', cwd);
70 }
71 const sessionIdParam = this.params.sessionId;
72 if (sessionIdParam) {
73 const sessionId = decodeURIComponent(sessionIdParam);
74 wsUrl.searchParams.append('sessionId', sessionId);
75 }
76 const platformName = this.params.platformName;
77 if (platformName) {
78 wsUrl.searchParams.append('platform', platformName);
79 }
80 this.websocket = new this.WebSocketType(wsUrl.href);
81 this.opening = true;
82 this.websocket.addEventListener('open', event => {
83 logger.info('websocket open', event);
84 this.opening = false;
85 this.exponentialReconnectDelay = LocalWebSocketEventBus.DEFAULT_RECONNECT_CHECK_TIME_MS;
86
87 this.websocket.addEventListener('message', e => {
88 for (const handler of this.handlers) {
89 handler(e);
90 }
91 });
92
93 // if any messages were sent while reconnecting, they were queued up.
94 // Send them all now that we've reconnected
95 while (this.queuedMessages.length > 0) {
96 const queuedMessage = this.queuedMessages[0];
97 this.websocket.send(queuedMessage);
98 // only dequeue after successfully sending the message
99 this.queuedMessages.shift();
100 }
101
102 this.setStatus({type: 'open'});
103 });
104
105 this.websocket.addEventListener('close', event => {
106 this.opening = false;
107 if (event.code === CLOSED_AND_SHOULD_NOT_RECONNECT_CODE) {
108 // Don't schedule reconnect if the server told us this is a permanent failure,
109 // e.g. invalid token
110 this.setStatus({type: 'error', error: event.reason});
111 return;
112 }
113 if (!this.disposed) {
114 this.scheduleReconnect();
115 }
116 });
117
118 return this.websocket;
119 }
120
121 private setStatus(status: MessageBusStatus) {
122 this.status = status;
123 this.statusChangeHandlers.forEach(handler => handler(status));
124 }
125
126 private scheduleReconnect() {
127 this.setStatus({type: 'reconnecting'});
128 logger.info(`websocket connection closed. Retrying in ${this.exponentialReconnectDelay}ms`);
129 setTimeout(() => {
130 this.startConnection();
131 }, this.exponentialReconnectDelay);
132
133 this.exponentialReconnectDelay = Math.min(
134 this.exponentialReconnectDelay * 2,
135 LocalWebSocketEventBus.MAX_RECONNECT_CHECK_TIME_MS,
136 );
137 }
138
139 onMessage(handler: (event: MessageEvent<string>) => void | Promise<void>): Disposable {
140 // we need to track handlers ourself instead of directly calling this.websocket.addEventListener here,
141 // since we'll get a new WebSocket on reconnect.
142 this.handlers.push(handler);
143 const dispose = () => {
144 const foundIndex = this.handlers.indexOf(handler);
145 if (foundIndex !== -1) {
146 this.handlers.splice(foundIndex, 1);
147 }
148 };
149 return {dispose};
150 }
151
152 postMessage(message: string) {
153 if (this.status.type === 'open') {
154 this.websocket.send(message);
155 } else {
156 this.queuedMessages.push(message);
157 }
158 }
159
160 onChangeStatus(handler: (newStatus: MessageBusStatus) => void | Promise<void>): Disposable {
161 this.statusChangeHandlers.push(handler);
162 handler(this.status); // seed with current status
163 const dispose = () => {
164 const foundIndex = this.statusChangeHandlers.indexOf(handler);
165 if (foundIndex !== -1) {
166 this.statusChangeHandlers.splice(foundIndex, 1);
167 }
168 };
169 return {dispose};
170 }
171
172 forceDisconnect(durationMs = 1000) {
173 this.websocket.close();
174 this.exponentialReconnectDelay = durationMs;
175 this.scheduleReconnect();
176 }
177}
178