| b69ab31 | | | 1 | /** |
| b69ab31 | | | 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| b69ab31 | | | 3 | * |
| b69ab31 | | | 4 | * This source code is licensed under the MIT license found in the |
| b69ab31 | | | 5 | * LICENSE file in the root directory of this source tree. |
| b69ab31 | | | 6 | */ |
| b69ab31 | | | 7 | |
| b69ab31 | | | 8 | import type {MessageBus} from './MessageBus'; |
| b69ab31 | | | 9 | import type {ClientToServerMessage, Disposable, ServerToClientMessage} from './types'; |
| b69ab31 | | | 10 | |
| b69ab31 | | | 11 | import {defer} from 'shared/utils'; |
| b69ab31 | | | 12 | import platform from './platform'; |
| b69ab31 | | | 13 | import {deserializeFromString, serializeToString} from './serialize'; |
| b69ab31 | | | 14 | |
| b69ab31 | | | 15 | export type IncomingMessage = ServerToClientMessage; |
| b69ab31 | | | 16 | export type OutgoingMessage = ClientToServerMessage; |
| b69ab31 | | | 17 | |
| b69ab31 | | | 18 | export const debugLogMessageTraffic = {shouldLog: false}; |
| b69ab31 | | | 19 | |
| b69ab31 | | | 20 | export interface ClientToServerAPI { |
| b69ab31 | | | 21 | dispose(): void; |
| b69ab31 | | | 22 | onMessageOfType<T extends IncomingMessage['type']>( |
| b69ab31 | | | 23 | type: T, |
| b69ab31 | | | 24 | handler: (event: IncomingMessage & {type: T}) => void | Promise<void>, |
| b69ab31 | | | 25 | ): Disposable; |
| b69ab31 | | | 26 | |
| b69ab31 | | | 27 | postMessage(message: OutgoingMessage): void; |
| b69ab31 | | | 28 | onConnectOrReconnect(callback: () => unknown): () => void; |
| b69ab31 | | | 29 | } |
| b69ab31 | | | 30 | |
| b69ab31 | | | 31 | /** |
| b69ab31 | | | 32 | * Message passing channel built on top of MessageBus. |
| b69ab31 | | | 33 | * Use to send and listen for well-typed events with the server |
| b69ab31 | | | 34 | */ |
| b69ab31 | | | 35 | class ClientToServerAPIImpl implements ClientToServerAPI { |
| b69ab31 | | | 36 | constructor(private messageBus: MessageBus) {} |
| b69ab31 | | | 37 | |
| b69ab31 | | | 38 | private listenersByType = new Map< |
| b69ab31 | | | 39 | string, |
| b69ab31 | | | 40 | Set<(message: IncomingMessage) => void | Promise<void>> |
| b69ab31 | | | 41 | >(); |
| b69ab31 | | | 42 | private incomingListener = this.messageBus.onMessage(event => { |
| b69ab31 | | | 43 | const data = deserializeFromString(event.data as string) as IncomingMessage; |
| b69ab31 | | | 44 | if (debugLogMessageTraffic.shouldLog) { |
| b69ab31 | | | 45 | // eslint-disable-next-line no-console |
| b69ab31 | | | 46 | console.log('%c ⬅ Incoming ', 'color:white;background-color:tomato', data); |
| b69ab31 | | | 47 | } |
| b69ab31 | | | 48 | const {type} = data; |
| b69ab31 | | | 49 | const listeners = this.listenersByType.get(type); |
| b69ab31 | | | 50 | if (!listeners) { |
| b69ab31 | | | 51 | return; |
| b69ab31 | | | 52 | } |
| b69ab31 | | | 53 | listeners.forEach(handle => handle(data)); |
| b69ab31 | | | 54 | }); |
| b69ab31 | | | 55 | |
| b69ab31 | | | 56 | dispose() { |
| b69ab31 | | | 57 | this.incomingListener.dispose(); |
| b69ab31 | | | 58 | } |
| b69ab31 | | | 59 | |
| b69ab31 | | | 60 | onMessageOfType<T extends IncomingMessage['type']>( |
| b69ab31 | | | 61 | type: T, |
| b69ab31 | | | 62 | handler: (event: IncomingMessage & {type: T}) => void | Promise<void>, |
| b69ab31 | | | 63 | dispose?: () => void, |
| b69ab31 | | | 64 | ): Disposable { |
| b69ab31 | | | 65 | let found = this.listenersByType.get(type); |
| b69ab31 | | | 66 | if (found == null) { |
| b69ab31 | | | 67 | found = new Set(); |
| b69ab31 | | | 68 | this.listenersByType.set(type, found); |
| b69ab31 | | | 69 | } |
| b69ab31 | | | 70 | found?.add(handler as (event: IncomingMessage) => void | Promise<void>); |
| b69ab31 | | | 71 | return { |
| b69ab31 | | | 72 | dispose: () => { |
| b69ab31 | | | 73 | const found = this.listenersByType.get(type); |
| b69ab31 | | | 74 | if (found) { |
| b69ab31 | | | 75 | dispose?.(); |
| b69ab31 | | | 76 | found.delete(handler as (event: IncomingMessage) => void | Promise<void>); |
| b69ab31 | | | 77 | } |
| b69ab31 | | | 78 | }, |
| b69ab31 | | | 79 | }; |
| b69ab31 | | | 80 | } |
| b69ab31 | | | 81 | |
| b69ab31 | | | 82 | /** |
| b69ab31 | | | 83 | * Async generator that yields the given type of events. |
| b69ab31 | | | 84 | * The generator ends when the connection is dropped, or if the callsite |
| b69ab31 | | | 85 | * uses `break`, `return`, or `throw` to exit the loop body. |
| b69ab31 | | | 86 | * |
| b69ab31 | | | 87 | * The event listener will be set up immediately after calling this function, |
| b69ab31 | | | 88 | * before the first iteration, and teared down when exiting the loop. |
| b69ab31 | | | 89 | * |
| b69ab31 | | | 90 | * Typically used in an async function, like: |
| b69ab31 | | | 91 | * |
| b69ab31 | | | 92 | * ``` |
| b69ab31 | | | 93 | * async function foo() { |
| b69ab31 | | | 94 | * // Set up the listener before sending the request. |
| b69ab31 | | | 95 | * const iter = clientToServerAPI.iterateMessageOfType('ResponseType'); |
| b69ab31 | | | 96 | * clientToServerAPI.postMessage('RequestType', ...); |
| b69ab31 | | | 97 | * // Check responses until getting the one we look for. |
| b69ab31 | | | 98 | * for await (const event of iter) { |
| b69ab31 | | | 99 | * if (matchesRequest(event)) { |
| b69ab31 | | | 100 | * if (isGood(event)) { |
| b69ab31 | | | 101 | * return ... |
| b69ab31 | | | 102 | * } else { |
| b69ab31 | | | 103 | * throw ... |
| b69ab31 | | | 104 | * } |
| b69ab31 | | | 105 | * } |
| b69ab31 | | | 106 | * } |
| b69ab31 | | | 107 | * } |
| b69ab31 | | | 108 | * ``` |
| b69ab31 | | | 109 | */ |
| b69ab31 | | | 110 | iterateMessageOfType<T extends IncomingMessage['type']>( |
| b69ab31 | | | 111 | type: T, |
| b69ab31 | | | 112 | ): AsyncGenerator<IncomingMessage & {type: T}, undefined> { |
| b69ab31 | | | 113 | // Setup the listener before the first `next()`. |
| b69ab31 | | | 114 | type Event = IncomingMessage & {type: T}; |
| b69ab31 | | | 115 | const pendingEvents: Event[] = []; |
| b69ab31 | | | 116 | const pendingPromises: [(value: Event) => void, (reason: Error) => void][] = []; |
| b69ab31 | | | 117 | let listening = true; |
| b69ab31 | | | 118 | const listener = this.onMessageOfType( |
| b69ab31 | | | 119 | type, |
| b69ab31 | | | 120 | event => { |
| b69ab31 | | | 121 | const resolveReject = pendingPromises.shift(); |
| b69ab31 | | | 122 | if (resolveReject) { |
| b69ab31 | | | 123 | resolveReject[0](event); |
| b69ab31 | | | 124 | } else { |
| b69ab31 | | | 125 | pendingEvents.push(event); |
| b69ab31 | | | 126 | } |
| b69ab31 | | | 127 | }, |
| b69ab31 | | | 128 | () => { |
| b69ab31 | | | 129 | for (const [, reject] of pendingPromises) { |
| b69ab31 | | | 130 | reject(new Error('Connection was dropped')); |
| b69ab31 | | | 131 | } |
| b69ab31 | | | 132 | pendingPromises.length = 0; |
| b69ab31 | | | 133 | listening = false; |
| b69ab31 | | | 134 | }, |
| b69ab31 | | | 135 | ); |
| b69ab31 | | | 136 | |
| b69ab31 | | | 137 | // This is a separate function because we want to set the listener |
| b69ab31 | | | 138 | // immediately when the callsite calls `iterateMessageOfType`. |
| b69ab31 | | | 139 | return (async function* (): AsyncGenerator<Event, undefined> { |
| b69ab31 | | | 140 | try { |
| b69ab31 | | | 141 | while (listening) { |
| b69ab31 | | | 142 | const event = pendingEvents.shift(); |
| b69ab31 | | | 143 | if (event === undefined) { |
| b69ab31 | | | 144 | yield new Promise<Event>((resolve, reject) => { |
| b69ab31 | | | 145 | pendingPromises.push([resolve, reject]); |
| b69ab31 | | | 146 | }); |
| b69ab31 | | | 147 | } else { |
| b69ab31 | | | 148 | yield event; |
| b69ab31 | | | 149 | } |
| b69ab31 | | | 150 | } |
| b69ab31 | | | 151 | } catch { |
| b69ab31 | | | 152 | // ex. connection dropped. |
| b69ab31 | | | 153 | } finally { |
| b69ab31 | | | 154 | listener.dispose(); |
| b69ab31 | | | 155 | } |
| b69ab31 | | | 156 | return undefined; |
| b69ab31 | | | 157 | })(); |
| b69ab31 | | | 158 | } |
| b69ab31 | | | 159 | |
| b69ab31 | | | 160 | /** |
| b69ab31 | | | 161 | * Returns the next message in the stream of `type` that also matches the given predicate. |
| b69ab31 | | | 162 | */ |
| b69ab31 | | | 163 | nextMessageMatching<T extends IncomingMessage['type']>( |
| b69ab31 | | | 164 | type: T, |
| b69ab31 | | | 165 | test: (message: IncomingMessage & {type: T}) => boolean, |
| b69ab31 | | | 166 | ): Promise<IncomingMessage & {type: T}> { |
| b69ab31 | | | 167 | const deferred = defer<IncomingMessage & {type: T}>(); |
| b69ab31 | | | 168 | let dispose: Disposable | null = this.onMessageOfType(type, message => { |
| b69ab31 | | | 169 | if (test(message)) { |
| b69ab31 | | | 170 | dispose?.dispose(); |
| b69ab31 | | | 171 | dispose = null; |
| b69ab31 | | | 172 | deferred.resolve(message); |
| b69ab31 | | | 173 | } |
| b69ab31 | | | 174 | }); |
| b69ab31 | | | 175 | |
| b69ab31 | | | 176 | return deferred.promise; |
| b69ab31 | | | 177 | } |
| b69ab31 | | | 178 | |
| b69ab31 | | | 179 | postMessage(message: ClientToServerMessage) { |
| b69ab31 | | | 180 | this.messageBus.postMessage(serializeToString(message)); |
| b69ab31 | | | 181 | if (debugLogMessageTraffic.shouldLog) { |
| b69ab31 | | | 182 | // eslint-disable-next-line no-console |
| b69ab31 | | | 183 | console.log('%c Outgoing ⮕ ', 'color:white;background-color:royalblue', message); |
| b69ab31 | | | 184 | } |
| b69ab31 | | | 185 | } |
| b69ab31 | | | 186 | |
| b69ab31 | | | 187 | /** |
| b69ab31 | | | 188 | * Call a callback when a connection is established, or reestablished after a disconnection. |
| b69ab31 | | | 189 | */ |
| b69ab31 | | | 190 | onConnectOrReconnect(callback: () => (() => unknown) | unknown): () => void { |
| b69ab31 | | | 191 | let reconnecting = true; |
| b69ab31 | | | 192 | let disposeCallback: (() => unknown) | unknown = undefined; |
| b69ab31 | | | 193 | const disposable = this.messageBus.onChangeStatus(newStatus => { |
| b69ab31 | | | 194 | if (newStatus.type === 'reconnecting') { |
| b69ab31 | | | 195 | reconnecting = true; |
| b69ab31 | | | 196 | } else if (newStatus.type === 'open') { |
| b69ab31 | | | 197 | if (reconnecting) { |
| b69ab31 | | | 198 | disposeCallback = callback(); |
| b69ab31 | | | 199 | } |
| b69ab31 | | | 200 | reconnecting = false; |
| b69ab31 | | | 201 | } |
| b69ab31 | | | 202 | }); |
| b69ab31 | | | 203 | return () => { |
| b69ab31 | | | 204 | disposable.dispose(); |
| b69ab31 | | | 205 | typeof disposeCallback === 'function' && disposeCallback?.(); |
| b69ab31 | | | 206 | }; |
| b69ab31 | | | 207 | } |
| b69ab31 | | | 208 | |
| b69ab31 | | | 209 | private cwdChangeHandlers: Array<() => unknown> = []; |
| b69ab31 | | | 210 | onCwdChanged(cb: () => unknown) { |
| b69ab31 | | | 211 | this.cwdChangeHandlers.push(cb); |
| b69ab31 | | | 212 | return () => { |
| b69ab31 | | | 213 | this.cwdChangeHandlers.splice(this.cwdChangeHandlers.indexOf(cb), 1); |
| b69ab31 | | | 214 | }; |
| b69ab31 | | | 215 | } |
| b69ab31 | | | 216 | cwdChanged() { |
| b69ab31 | | | 217 | this.cwdChangeHandlers.forEach(handler => handler()); |
| b69ab31 | | | 218 | } |
| b69ab31 | | | 219 | |
| b69ab31 | | | 220 | /** |
| b69ab31 | | | 221 | * Call a callback when a connection is established, or reestablished after a disconnection, |
| b69ab31 | | | 222 | * or the current working directory (and therefore usually repository) changes. |
| b69ab31 | | | 223 | */ |
| b69ab31 | | | 224 | onSetup(cb: () => (() => unknown) | unknown): () => void { |
| b69ab31 | | | 225 | const disposeConnectionSubscription = this.onConnectOrReconnect(cb); |
| b69ab31 | | | 226 | const disposeCwdChange = this.onCwdChanged(cb); |
| b69ab31 | | | 227 | |
| b69ab31 | | | 228 | return () => { |
| b69ab31 | | | 229 | disposeConnectionSubscription(); |
| b69ab31 | | | 230 | disposeCwdChange(); |
| b69ab31 | | | 231 | }; |
| b69ab31 | | | 232 | } |
| b69ab31 | | | 233 | } |
| b69ab31 | | | 234 | |
| b69ab31 | | | 235 | const clientToServerAPI = new ClientToServerAPIImpl(platform.messageBus); |
| b69ab31 | | | 236 | |
| b69ab31 | | | 237 | declare global { |
| b69ab31 | | | 238 | interface Window { |
| b69ab31 | | | 239 | clientToServerAPI?: ClientToServerAPI; |
| b69ab31 | | | 240 | } |
| b69ab31 | | | 241 | } |
| b69ab31 | | | 242 | window.clientToServerAPI = clientToServerAPI; |
| b69ab31 | | | 243 | |
| b69ab31 | | | 244 | export default clientToServerAPI; |