addons/isl/src/ClientToServerAPI.tsblame
View source
b69ab311/**
b69ab312 * Copyright (c) Meta Platforms, Inc. and affiliates.
b69ab313 *
b69ab314 * This source code is licensed under the MIT license found in the
b69ab315 * LICENSE file in the root directory of this source tree.
b69ab316 */
b69ab317
b69ab318import type {MessageBus} from './MessageBus';
b69ab319import type {ClientToServerMessage, Disposable, ServerToClientMessage} from './types';
b69ab3110
b69ab3111import {defer} from 'shared/utils';
b69ab3112import platform from './platform';
b69ab3113import {deserializeFromString, serializeToString} from './serialize';
b69ab3114
b69ab3115export type IncomingMessage = ServerToClientMessage;
b69ab3116export type OutgoingMessage = ClientToServerMessage;
b69ab3117
b69ab3118export const debugLogMessageTraffic = {shouldLog: false};
b69ab3119
b69ab3120export interface ClientToServerAPI {
b69ab3121 dispose(): void;
b69ab3122 onMessageOfType<T extends IncomingMessage['type']>(
b69ab3123 type: T,
b69ab3124 handler: (event: IncomingMessage & {type: T}) => void | Promise<void>,
b69ab3125 ): Disposable;
b69ab3126
b69ab3127 postMessage(message: OutgoingMessage): void;
b69ab3128 onConnectOrReconnect(callback: () => unknown): () => void;
b69ab3129}
b69ab3130
b69ab3131/**
b69ab3132 * Message passing channel built on top of MessageBus.
b69ab3133 * Use to send and listen for well-typed events with the server
b69ab3134 */
b69ab3135class ClientToServerAPIImpl implements ClientToServerAPI {
b69ab3136 constructor(private messageBus: MessageBus) {}
b69ab3137
b69ab3138 private listenersByType = new Map<
b69ab3139 string,
b69ab3140 Set<(message: IncomingMessage) => void | Promise<void>>
b69ab3141 >();
b69ab3142 private incomingListener = this.messageBus.onMessage(event => {
b69ab3143 const data = deserializeFromString(event.data as string) as IncomingMessage;
b69ab3144 if (debugLogMessageTraffic.shouldLog) {
b69ab3145 // eslint-disable-next-line no-console
b69ab3146 console.log('%c ⬅ Incoming ', 'color:white;background-color:tomato', data);
b69ab3147 }
b69ab3148 const {type} = data;
b69ab3149 const listeners = this.listenersByType.get(type);
b69ab3150 if (!listeners) {
b69ab3151 return;
b69ab3152 }
b69ab3153 listeners.forEach(handle => handle(data));
b69ab3154 });
b69ab3155
b69ab3156 dispose() {
b69ab3157 this.incomingListener.dispose();
b69ab3158 }
b69ab3159
b69ab3160 onMessageOfType<T extends IncomingMessage['type']>(
b69ab3161 type: T,
b69ab3162 handler: (event: IncomingMessage & {type: T}) => void | Promise<void>,
b69ab3163 dispose?: () => void,
b69ab3164 ): Disposable {
b69ab3165 let found = this.listenersByType.get(type);
b69ab3166 if (found == null) {
b69ab3167 found = new Set();
b69ab3168 this.listenersByType.set(type, found);
b69ab3169 }
b69ab3170 found?.add(handler as (event: IncomingMessage) => void | Promise<void>);
b69ab3171 return {
b69ab3172 dispose: () => {
b69ab3173 const found = this.listenersByType.get(type);
b69ab3174 if (found) {
b69ab3175 dispose?.();
b69ab3176 found.delete(handler as (event: IncomingMessage) => void | Promise<void>);
b69ab3177 }
b69ab3178 },
b69ab3179 };
b69ab3180 }
b69ab3181
b69ab3182 /**
b69ab3183 * Async generator that yields the given type of events.
b69ab3184 * The generator ends when the connection is dropped, or if the callsite
b69ab3185 * uses `break`, `return`, or `throw` to exit the loop body.
b69ab3186 *
b69ab3187 * The event listener will be set up immediately after calling this function,
b69ab3188 * before the first iteration, and teared down when exiting the loop.
b69ab3189 *
b69ab3190 * Typically used in an async function, like:
b69ab3191 *
b69ab3192 * ```
b69ab3193 * async function foo() {
b69ab3194 * // Set up the listener before sending the request.
b69ab3195 * const iter = clientToServerAPI.iterateMessageOfType('ResponseType');
b69ab3196 * clientToServerAPI.postMessage('RequestType', ...);
b69ab3197 * // Check responses until getting the one we look for.
b69ab3198 * for await (const event of iter) {
b69ab3199 * if (matchesRequest(event)) {
b69ab31100 * if (isGood(event)) {
b69ab31101 * return ...
b69ab31102 * } else {
b69ab31103 * throw ...
b69ab31104 * }
b69ab31105 * }
b69ab31106 * }
b69ab31107 * }
b69ab31108 * ```
b69ab31109 */
b69ab31110 iterateMessageOfType<T extends IncomingMessage['type']>(
b69ab31111 type: T,
b69ab31112 ): AsyncGenerator<IncomingMessage & {type: T}, undefined> {
b69ab31113 // Setup the listener before the first `next()`.
b69ab31114 type Event = IncomingMessage & {type: T};
b69ab31115 const pendingEvents: Event[] = [];
b69ab31116 const pendingPromises: [(value: Event) => void, (reason: Error) => void][] = [];
b69ab31117 let listening = true;
b69ab31118 const listener = this.onMessageOfType(
b69ab31119 type,
b69ab31120 event => {
b69ab31121 const resolveReject = pendingPromises.shift();
b69ab31122 if (resolveReject) {
b69ab31123 resolveReject[0](event);
b69ab31124 } else {
b69ab31125 pendingEvents.push(event);
b69ab31126 }
b69ab31127 },
b69ab31128 () => {
b69ab31129 for (const [, reject] of pendingPromises) {
b69ab31130 reject(new Error('Connection was dropped'));
b69ab31131 }
b69ab31132 pendingPromises.length = 0;
b69ab31133 listening = false;
b69ab31134 },
b69ab31135 );
b69ab31136
b69ab31137 // This is a separate function because we want to set the listener
b69ab31138 // immediately when the callsite calls `iterateMessageOfType`.
b69ab31139 return (async function* (): AsyncGenerator<Event, undefined> {
b69ab31140 try {
b69ab31141 while (listening) {
b69ab31142 const event = pendingEvents.shift();
b69ab31143 if (event === undefined) {
b69ab31144 yield new Promise<Event>((resolve, reject) => {
b69ab31145 pendingPromises.push([resolve, reject]);
b69ab31146 });
b69ab31147 } else {
b69ab31148 yield event;
b69ab31149 }
b69ab31150 }
b69ab31151 } catch {
b69ab31152 // ex. connection dropped.
b69ab31153 } finally {
b69ab31154 listener.dispose();
b69ab31155 }
b69ab31156 return undefined;
b69ab31157 })();
b69ab31158 }
b69ab31159
b69ab31160 /**
b69ab31161 * Returns the next message in the stream of `type` that also matches the given predicate.
b69ab31162 */
b69ab31163 nextMessageMatching<T extends IncomingMessage['type']>(
b69ab31164 type: T,
b69ab31165 test: (message: IncomingMessage & {type: T}) => boolean,
b69ab31166 ): Promise<IncomingMessage & {type: T}> {
b69ab31167 const deferred = defer<IncomingMessage & {type: T}>();
b69ab31168 let dispose: Disposable | null = this.onMessageOfType(type, message => {
b69ab31169 if (test(message)) {
b69ab31170 dispose?.dispose();
b69ab31171 dispose = null;
b69ab31172 deferred.resolve(message);
b69ab31173 }
b69ab31174 });
b69ab31175
b69ab31176 return deferred.promise;
b69ab31177 }
b69ab31178
b69ab31179 postMessage(message: ClientToServerMessage) {
b69ab31180 this.messageBus.postMessage(serializeToString(message));
b69ab31181 if (debugLogMessageTraffic.shouldLog) {
b69ab31182 // eslint-disable-next-line no-console
b69ab31183 console.log('%c Outgoing ⮕ ', 'color:white;background-color:royalblue', message);
b69ab31184 }
b69ab31185 }
b69ab31186
b69ab31187 /**
b69ab31188 * Call a callback when a connection is established, or reestablished after a disconnection.
b69ab31189 */
b69ab31190 onConnectOrReconnect(callback: () => (() => unknown) | unknown): () => void {
b69ab31191 let reconnecting = true;
b69ab31192 let disposeCallback: (() => unknown) | unknown = undefined;
b69ab31193 const disposable = this.messageBus.onChangeStatus(newStatus => {
b69ab31194 if (newStatus.type === 'reconnecting') {
b69ab31195 reconnecting = true;
b69ab31196 } else if (newStatus.type === 'open') {
b69ab31197 if (reconnecting) {
b69ab31198 disposeCallback = callback();
b69ab31199 }
b69ab31200 reconnecting = false;
b69ab31201 }
b69ab31202 });
b69ab31203 return () => {
b69ab31204 disposable.dispose();
b69ab31205 typeof disposeCallback === 'function' && disposeCallback?.();
b69ab31206 };
b69ab31207 }
b69ab31208
b69ab31209 private cwdChangeHandlers: Array<() => unknown> = [];
b69ab31210 onCwdChanged(cb: () => unknown) {
b69ab31211 this.cwdChangeHandlers.push(cb);
b69ab31212 return () => {
b69ab31213 this.cwdChangeHandlers.splice(this.cwdChangeHandlers.indexOf(cb), 1);
b69ab31214 };
b69ab31215 }
b69ab31216 cwdChanged() {
b69ab31217 this.cwdChangeHandlers.forEach(handler => handler());
b69ab31218 }
b69ab31219
b69ab31220 /**
b69ab31221 * Call a callback when a connection is established, or reestablished after a disconnection,
b69ab31222 * or the current working directory (and therefore usually repository) changes.
b69ab31223 */
b69ab31224 onSetup(cb: () => (() => unknown) | unknown): () => void {
b69ab31225 const disposeConnectionSubscription = this.onConnectOrReconnect(cb);
b69ab31226 const disposeCwdChange = this.onCwdChanged(cb);
b69ab31227
b69ab31228 return () => {
b69ab31229 disposeConnectionSubscription();
b69ab31230 disposeCwdChange();
b69ab31231 };
b69ab31232 }
b69ab31233}
b69ab31234
b69ab31235const clientToServerAPI = new ClientToServerAPIImpl(platform.messageBus);
b69ab31236
b69ab31237declare global {
b69ab31238 interface Window {
b69ab31239 clientToServerAPI?: ClientToServerAPI;
b69ab31240 }
b69ab31241}
b69ab31242window.clientToServerAPI = clientToServerAPI;
b69ab31243
b69ab31244export default clientToServerAPI;