| 1 | /** |
| 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | * |
| 4 | * This source code is licensed under the MIT license found in the |
| 5 | * LICENSE file in the root directory of this source tree. |
| 6 | */ |
| 7 | |
| 8 | import type {MessageBus} from './MessageBus'; |
| 9 | import type {ClientToServerMessage, Disposable, ServerToClientMessage} from './types'; |
| 10 | |
| 11 | import {defer} from 'shared/utils'; |
| 12 | import platform from './platform'; |
| 13 | import {deserializeFromString, serializeToString} from './serialize'; |
| 14 | |
| 15 | export type IncomingMessage = ServerToClientMessage; |
| 16 | export type OutgoingMessage = ClientToServerMessage; |
| 17 | |
| 18 | export const debugLogMessageTraffic = {shouldLog: false}; |
| 19 | |
| 20 | export interface ClientToServerAPI { |
| 21 | dispose(): void; |
| 22 | onMessageOfType<T extends IncomingMessage['type']>( |
| 23 | type: T, |
| 24 | handler: (event: IncomingMessage & {type: T}) => void | Promise<void>, |
| 25 | ): Disposable; |
| 26 | |
| 27 | postMessage(message: OutgoingMessage): void; |
| 28 | onConnectOrReconnect(callback: () => unknown): () => void; |
| 29 | } |
| 30 | |
| 31 | /** |
| 32 | * Message passing channel built on top of MessageBus. |
| 33 | * Use to send and listen for well-typed events with the server |
| 34 | */ |
| 35 | class ClientToServerAPIImpl implements ClientToServerAPI { |
| 36 | constructor(private messageBus: MessageBus) {} |
| 37 | |
| 38 | private listenersByType = new Map< |
| 39 | string, |
| 40 | Set<(message: IncomingMessage) => void | Promise<void>> |
| 41 | >(); |
| 42 | private incomingListener = this.messageBus.onMessage(event => { |
| 43 | const data = deserializeFromString(event.data as string) as IncomingMessage; |
| 44 | if (debugLogMessageTraffic.shouldLog) { |
| 45 | // eslint-disable-next-line no-console |
| 46 | console.log('%c ⬅ Incoming ', 'color:white;background-color:tomato', data); |
| 47 | } |
| 48 | const {type} = data; |
| 49 | const listeners = this.listenersByType.get(type); |
| 50 | if (!listeners) { |
| 51 | return; |
| 52 | } |
| 53 | listeners.forEach(handle => handle(data)); |
| 54 | }); |
| 55 | |
| 56 | dispose() { |
| 57 | this.incomingListener.dispose(); |
| 58 | } |
| 59 | |
| 60 | onMessageOfType<T extends IncomingMessage['type']>( |
| 61 | type: T, |
| 62 | handler: (event: IncomingMessage & {type: T}) => void | Promise<void>, |
| 63 | dispose?: () => void, |
| 64 | ): Disposable { |
| 65 | let found = this.listenersByType.get(type); |
| 66 | if (found == null) { |
| 67 | found = new Set(); |
| 68 | this.listenersByType.set(type, found); |
| 69 | } |
| 70 | found?.add(handler as (event: IncomingMessage) => void | Promise<void>); |
| 71 | return { |
| 72 | dispose: () => { |
| 73 | const found = this.listenersByType.get(type); |
| 74 | if (found) { |
| 75 | dispose?.(); |
| 76 | found.delete(handler as (event: IncomingMessage) => void | Promise<void>); |
| 77 | } |
| 78 | }, |
| 79 | }; |
| 80 | } |
| 81 | |
| 82 | /** |
| 83 | * Async generator that yields the given type of events. |
| 84 | * The generator ends when the connection is dropped, or if the callsite |
| 85 | * uses `break`, `return`, or `throw` to exit the loop body. |
| 86 | * |
| 87 | * The event listener will be set up immediately after calling this function, |
| 88 | * before the first iteration, and teared down when exiting the loop. |
| 89 | * |
| 90 | * Typically used in an async function, like: |
| 91 | * |
| 92 | * ``` |
| 93 | * async function foo() { |
| 94 | * // Set up the listener before sending the request. |
| 95 | * const iter = clientToServerAPI.iterateMessageOfType('ResponseType'); |
| 96 | * clientToServerAPI.postMessage('RequestType', ...); |
| 97 | * // Check responses until getting the one we look for. |
| 98 | * for await (const event of iter) { |
| 99 | * if (matchesRequest(event)) { |
| 100 | * if (isGood(event)) { |
| 101 | * return ... |
| 102 | * } else { |
| 103 | * throw ... |
| 104 | * } |
| 105 | * } |
| 106 | * } |
| 107 | * } |
| 108 | * ``` |
| 109 | */ |
| 110 | iterateMessageOfType<T extends IncomingMessage['type']>( |
| 111 | type: T, |
| 112 | ): AsyncGenerator<IncomingMessage & {type: T}, undefined> { |
| 113 | // Setup the listener before the first `next()`. |
| 114 | type Event = IncomingMessage & {type: T}; |
| 115 | const pendingEvents: Event[] = []; |
| 116 | const pendingPromises: [(value: Event) => void, (reason: Error) => void][] = []; |
| 117 | let listening = true; |
| 118 | const listener = this.onMessageOfType( |
| 119 | type, |
| 120 | event => { |
| 121 | const resolveReject = pendingPromises.shift(); |
| 122 | if (resolveReject) { |
| 123 | resolveReject[0](event); |
| 124 | } else { |
| 125 | pendingEvents.push(event); |
| 126 | } |
| 127 | }, |
| 128 | () => { |
| 129 | for (const [, reject] of pendingPromises) { |
| 130 | reject(new Error('Connection was dropped')); |
| 131 | } |
| 132 | pendingPromises.length = 0; |
| 133 | listening = false; |
| 134 | }, |
| 135 | ); |
| 136 | |
| 137 | // This is a separate function because we want to set the listener |
| 138 | // immediately when the callsite calls `iterateMessageOfType`. |
| 139 | return (async function* (): AsyncGenerator<Event, undefined> { |
| 140 | try { |
| 141 | while (listening) { |
| 142 | const event = pendingEvents.shift(); |
| 143 | if (event === undefined) { |
| 144 | yield new Promise<Event>((resolve, reject) => { |
| 145 | pendingPromises.push([resolve, reject]); |
| 146 | }); |
| 147 | } else { |
| 148 | yield event; |
| 149 | } |
| 150 | } |
| 151 | } catch { |
| 152 | // ex. connection dropped. |
| 153 | } finally { |
| 154 | listener.dispose(); |
| 155 | } |
| 156 | return undefined; |
| 157 | })(); |
| 158 | } |
| 159 | |
| 160 | /** |
| 161 | * Returns the next message in the stream of `type` that also matches the given predicate. |
| 162 | */ |
| 163 | nextMessageMatching<T extends IncomingMessage['type']>( |
| 164 | type: T, |
| 165 | test: (message: IncomingMessage & {type: T}) => boolean, |
| 166 | ): Promise<IncomingMessage & {type: T}> { |
| 167 | const deferred = defer<IncomingMessage & {type: T}>(); |
| 168 | let dispose: Disposable | null = this.onMessageOfType(type, message => { |
| 169 | if (test(message)) { |
| 170 | dispose?.dispose(); |
| 171 | dispose = null; |
| 172 | deferred.resolve(message); |
| 173 | } |
| 174 | }); |
| 175 | |
| 176 | return deferred.promise; |
| 177 | } |
| 178 | |
| 179 | postMessage(message: ClientToServerMessage) { |
| 180 | this.messageBus.postMessage(serializeToString(message)); |
| 181 | if (debugLogMessageTraffic.shouldLog) { |
| 182 | // eslint-disable-next-line no-console |
| 183 | console.log('%c Outgoing ⮕ ', 'color:white;background-color:royalblue', message); |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | /** |
| 188 | * Call a callback when a connection is established, or reestablished after a disconnection. |
| 189 | */ |
| 190 | onConnectOrReconnect(callback: () => (() => unknown) | unknown): () => void { |
| 191 | let reconnecting = true; |
| 192 | let disposeCallback: (() => unknown) | unknown = undefined; |
| 193 | const disposable = this.messageBus.onChangeStatus(newStatus => { |
| 194 | if (newStatus.type === 'reconnecting') { |
| 195 | reconnecting = true; |
| 196 | } else if (newStatus.type === 'open') { |
| 197 | if (reconnecting) { |
| 198 | disposeCallback = callback(); |
| 199 | } |
| 200 | reconnecting = false; |
| 201 | } |
| 202 | }); |
| 203 | return () => { |
| 204 | disposable.dispose(); |
| 205 | typeof disposeCallback === 'function' && disposeCallback?.(); |
| 206 | }; |
| 207 | } |
| 208 | |
| 209 | private cwdChangeHandlers: Array<() => unknown> = []; |
| 210 | onCwdChanged(cb: () => unknown) { |
| 211 | this.cwdChangeHandlers.push(cb); |
| 212 | return () => { |
| 213 | this.cwdChangeHandlers.splice(this.cwdChangeHandlers.indexOf(cb), 1); |
| 214 | }; |
| 215 | } |
| 216 | cwdChanged() { |
| 217 | this.cwdChangeHandlers.forEach(handler => handler()); |
| 218 | } |
| 219 | |
| 220 | /** |
| 221 | * Call a callback when a connection is established, or reestablished after a disconnection, |
| 222 | * or the current working directory (and therefore usually repository) changes. |
| 223 | */ |
| 224 | onSetup(cb: () => (() => unknown) | unknown): () => void { |
| 225 | const disposeConnectionSubscription = this.onConnectOrReconnect(cb); |
| 226 | const disposeCwdChange = this.onCwdChanged(cb); |
| 227 | |
| 228 | return () => { |
| 229 | disposeConnectionSubscription(); |
| 230 | disposeCwdChange(); |
| 231 | }; |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | const clientToServerAPI = new ClientToServerAPIImpl(platform.messageBus); |
| 236 | |
| 237 | declare global { |
| 238 | interface Window { |
| 239 | clientToServerAPI?: ClientToServerAPI; |
| 240 | } |
| 241 | } |
| 242 | window.clientToServerAPI = clientToServerAPI; |
| 243 | |
| 244 | export default clientToServerAPI; |
| 245 | |