addons/isl/src/LocalWebSocketEventBus.tsblame
View source
b69ab311/**
b69ab312 * Copyright (c) Meta Platforms, Inc. and affiliates.
b69ab313 *
b69ab314 * This source code is licensed under the MIT license found in the
b69ab315 * LICENSE file in the root directory of this source tree.
b69ab316 */
b69ab317
b69ab318import type {Disposable, MessageBusStatus, PlatformName} from './types';
b69ab319
b69ab3110import {CLOSED_AND_SHOULD_NOT_RECONNECT_CODE} from 'isl-server/src/constants';
b69ab3111import {logger} from './logger';
b69ab3112
b69ab3113export class LocalWebSocketEventBus {
b69ab3114 static MAX_RECONNECT_CHECK_TIME_MS = 60000;
b69ab3115 static DEFAULT_RECONNECT_CHECK_TIME_MS = 100;
b69ab3116
b69ab3117 private websocket: WebSocket;
b69ab3118 private status: MessageBusStatus = {type: 'initializing'};
b69ab3119 private exponentialReconnectDelay = LocalWebSocketEventBus.DEFAULT_RECONNECT_CHECK_TIME_MS;
b69ab3120 private queuedMessages: Array<string | ArrayBuffer> = [];
b69ab3121
b69ab3122 // A sub-state of "status", used by `startConnection` to avoid creating multiple
b69ab3123 // websockets while connecting.
b69ab3124 //
b69ab3125 // status.type: | 'initializing' | 'open' | 'reconnecting' | 'open'
b69ab3126 // opening: | true | false | false | true | false
b69ab3127 // ^^^^^^^ reconnect setTimeout
b69ab3128 private opening = false;
b69ab3129
b69ab3130 private handlers: Array<(event: MessageEvent<string>) => void | Promise<void>> = [];
b69ab3131 private statusChangeHandlers: Array<(newStatus: MessageBusStatus) => unknown> = [];
b69ab3132
b69ab3133 private disposed = false;
b69ab3134
b69ab3135 /**
b69ab3136 * @param host to use when creating the Web Socket to talk to the server. Should
b69ab3137 * include the hostname and optionally, a port, e.g., "localhost:3001" or "example.com".
b69ab3138 */
b69ab3139 constructor(
b69ab3140 private host: string,
b69ab3141 private WebSocketType: typeof WebSocket,
b69ab3142 private params: {token?: string; cwd?: string; sessionId?: string; platformName: PlatformName},
b69ab3143 ) {
b69ab3144 // startConnection already assigns to websocket, but we do it here so typescript knows websocket is always defined
b69ab3145 this.websocket = this.startConnection();
b69ab3146 }
b69ab3147
b69ab3148 public dispose() {
b69ab3149 if (this.disposed) {
b69ab3150 return;
b69ab3151 }
b69ab3152 this.disposed = true;
b69ab3153 this.websocket.close();
b69ab3154 }
b69ab3155
b69ab3156 private startConnection(): WebSocket {
b69ab3157 if (this.disposed || this.opening || this.status.type === 'open') {
b69ab3158 return this.websocket;
b69ab3159 }
b69ab3160 const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
b69ab3161 const wsUrl = new URL(`${wsProtocol}//${this.host}/ws`);
b69ab3162 const token = this.params.token;
b69ab3163 if (token) {
b69ab3164 wsUrl.searchParams.append('token', token);
b69ab3165 }
b69ab3166 const cwdParam = this.params.cwd;
b69ab3167 if (cwdParam) {
b69ab3168 const cwd = decodeURIComponent(cwdParam);
b69ab3169 wsUrl.searchParams.append('cwd', cwd);
b69ab3170 }
b69ab3171 const sessionIdParam = this.params.sessionId;
b69ab3172 if (sessionIdParam) {
b69ab3173 const sessionId = decodeURIComponent(sessionIdParam);
b69ab3174 wsUrl.searchParams.append('sessionId', sessionId);
b69ab3175 }
b69ab3176 const platformName = this.params.platformName;
b69ab3177 if (platformName) {
b69ab3178 wsUrl.searchParams.append('platform', platformName);
b69ab3179 }
b69ab3180 this.websocket = new this.WebSocketType(wsUrl.href);
b69ab3181 this.opening = true;
b69ab3182 this.websocket.addEventListener('open', event => {
b69ab3183 logger.info('websocket open', event);
b69ab3184 this.opening = false;
b69ab3185 this.exponentialReconnectDelay = LocalWebSocketEventBus.DEFAULT_RECONNECT_CHECK_TIME_MS;
b69ab3186
b69ab3187 this.websocket.addEventListener('message', e => {
b69ab3188 for (const handler of this.handlers) {
b69ab3189 handler(e);
b69ab3190 }
b69ab3191 });
b69ab3192
b69ab3193 // if any messages were sent while reconnecting, they were queued up.
b69ab3194 // Send them all now that we've reconnected
b69ab3195 while (this.queuedMessages.length > 0) {
b69ab3196 const queuedMessage = this.queuedMessages[0];
b69ab3197 this.websocket.send(queuedMessage);
b69ab3198 // only dequeue after successfully sending the message
b69ab3199 this.queuedMessages.shift();
b69ab31100 }
b69ab31101
b69ab31102 this.setStatus({type: 'open'});
b69ab31103 });
b69ab31104
b69ab31105 this.websocket.addEventListener('close', event => {
b69ab31106 this.opening = false;
b69ab31107 if (event.code === CLOSED_AND_SHOULD_NOT_RECONNECT_CODE) {
b69ab31108 // Don't schedule reconnect if the server told us this is a permanent failure,
b69ab31109 // e.g. invalid token
b69ab31110 this.setStatus({type: 'error', error: event.reason});
b69ab31111 return;
b69ab31112 }
b69ab31113 if (!this.disposed) {
b69ab31114 this.scheduleReconnect();
b69ab31115 }
b69ab31116 });
b69ab31117
b69ab31118 return this.websocket;
b69ab31119 }
b69ab31120
b69ab31121 private setStatus(status: MessageBusStatus) {
b69ab31122 this.status = status;
b69ab31123 this.statusChangeHandlers.forEach(handler => handler(status));
b69ab31124 }
b69ab31125
b69ab31126 private scheduleReconnect() {
b69ab31127 this.setStatus({type: 'reconnecting'});
b69ab31128 logger.info(`websocket connection closed. Retrying in ${this.exponentialReconnectDelay}ms`);
b69ab31129 setTimeout(() => {
b69ab31130 this.startConnection();
b69ab31131 }, this.exponentialReconnectDelay);
b69ab31132
b69ab31133 this.exponentialReconnectDelay = Math.min(
b69ab31134 this.exponentialReconnectDelay * 2,
b69ab31135 LocalWebSocketEventBus.MAX_RECONNECT_CHECK_TIME_MS,
b69ab31136 );
b69ab31137 }
b69ab31138
b69ab31139 onMessage(handler: (event: MessageEvent<string>) => void | Promise<void>): Disposable {
b69ab31140 // we need to track handlers ourself instead of directly calling this.websocket.addEventListener here,
b69ab31141 // since we'll get a new WebSocket on reconnect.
b69ab31142 this.handlers.push(handler);
b69ab31143 const dispose = () => {
b69ab31144 const foundIndex = this.handlers.indexOf(handler);
b69ab31145 if (foundIndex !== -1) {
b69ab31146 this.handlers.splice(foundIndex, 1);
b69ab31147 }
b69ab31148 };
b69ab31149 return {dispose};
b69ab31150 }
b69ab31151
b69ab31152 postMessage(message: string) {
b69ab31153 if (this.status.type === 'open') {
b69ab31154 this.websocket.send(message);
b69ab31155 } else {
b69ab31156 this.queuedMessages.push(message);
b69ab31157 }
b69ab31158 }
b69ab31159
b69ab31160 onChangeStatus(handler: (newStatus: MessageBusStatus) => void | Promise<void>): Disposable {
b69ab31161 this.statusChangeHandlers.push(handler);
b69ab31162 handler(this.status); // seed with current status
b69ab31163 const dispose = () => {
b69ab31164 const foundIndex = this.statusChangeHandlers.indexOf(handler);
b69ab31165 if (foundIndex !== -1) {
b69ab31166 this.statusChangeHandlers.splice(foundIndex, 1);
b69ab31167 }
b69ab31168 };
b69ab31169 return {dispose};
b69ab31170 }
b69ab31171
b69ab31172 forceDisconnect(durationMs = 1000) {
b69ab31173 this.websocket.close();
b69ab31174 this.exponentialReconnectDelay = durationMs;
b69ab31175 this.scheduleReconnect();
b69ab31176 }
b69ab31177}