7.5 KB245 lines
Blame
1/**
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8import type {MessageBus} from './MessageBus';
9import type {ClientToServerMessage, Disposable, ServerToClientMessage} from './types';
10
11import {defer} from 'shared/utils';
12import platform from './platform';
13import {deserializeFromString, serializeToString} from './serialize';
14
15export type IncomingMessage = ServerToClientMessage;
16export type OutgoingMessage = ClientToServerMessage;
17
18export const debugLogMessageTraffic = {shouldLog: false};
19
20export interface ClientToServerAPI {
21 dispose(): void;
22 onMessageOfType<T extends IncomingMessage['type']>(
23 type: T,
24 handler: (event: IncomingMessage & {type: T}) => void | Promise<void>,
25 ): Disposable;
26
27 postMessage(message: OutgoingMessage): void;
28 onConnectOrReconnect(callback: () => unknown): () => void;
29}
30
31/**
32 * Message passing channel built on top of MessageBus.
33 * Use to send and listen for well-typed events with the server
34 */
35class ClientToServerAPIImpl implements ClientToServerAPI {
36 constructor(private messageBus: MessageBus) {}
37
38 private listenersByType = new Map<
39 string,
40 Set<(message: IncomingMessage) => void | Promise<void>>
41 >();
42 private incomingListener = this.messageBus.onMessage(event => {
43 const data = deserializeFromString(event.data as string) as IncomingMessage;
44 if (debugLogMessageTraffic.shouldLog) {
45 // eslint-disable-next-line no-console
46 console.log('%c ⬅ Incoming ', 'color:white;background-color:tomato', data);
47 }
48 const {type} = data;
49 const listeners = this.listenersByType.get(type);
50 if (!listeners) {
51 return;
52 }
53 listeners.forEach(handle => handle(data));
54 });
55
56 dispose() {
57 this.incomingListener.dispose();
58 }
59
60 onMessageOfType<T extends IncomingMessage['type']>(
61 type: T,
62 handler: (event: IncomingMessage & {type: T}) => void | Promise<void>,
63 dispose?: () => void,
64 ): Disposable {
65 let found = this.listenersByType.get(type);
66 if (found == null) {
67 found = new Set();
68 this.listenersByType.set(type, found);
69 }
70 found?.add(handler as (event: IncomingMessage) => void | Promise<void>);
71 return {
72 dispose: () => {
73 const found = this.listenersByType.get(type);
74 if (found) {
75 dispose?.();
76 found.delete(handler as (event: IncomingMessage) => void | Promise<void>);
77 }
78 },
79 };
80 }
81
82 /**
83 * Async generator that yields the given type of events.
84 * The generator ends when the connection is dropped, or if the callsite
85 * uses `break`, `return`, or `throw` to exit the loop body.
86 *
87 * The event listener will be set up immediately after calling this function,
88 * before the first iteration, and teared down when exiting the loop.
89 *
90 * Typically used in an async function, like:
91 *
92 * ```
93 * async function foo() {
94 * // Set up the listener before sending the request.
95 * const iter = clientToServerAPI.iterateMessageOfType('ResponseType');
96 * clientToServerAPI.postMessage('RequestType', ...);
97 * // Check responses until getting the one we look for.
98 * for await (const event of iter) {
99 * if (matchesRequest(event)) {
100 * if (isGood(event)) {
101 * return ...
102 * } else {
103 * throw ...
104 * }
105 * }
106 * }
107 * }
108 * ```
109 */
110 iterateMessageOfType<T extends IncomingMessage['type']>(
111 type: T,
112 ): AsyncGenerator<IncomingMessage & {type: T}, undefined> {
113 // Setup the listener before the first `next()`.
114 type Event = IncomingMessage & {type: T};
115 const pendingEvents: Event[] = [];
116 const pendingPromises: [(value: Event) => void, (reason: Error) => void][] = [];
117 let listening = true;
118 const listener = this.onMessageOfType(
119 type,
120 event => {
121 const resolveReject = pendingPromises.shift();
122 if (resolveReject) {
123 resolveReject[0](event);
124 } else {
125 pendingEvents.push(event);
126 }
127 },
128 () => {
129 for (const [, reject] of pendingPromises) {
130 reject(new Error('Connection was dropped'));
131 }
132 pendingPromises.length = 0;
133 listening = false;
134 },
135 );
136
137 // This is a separate function because we want to set the listener
138 // immediately when the callsite calls `iterateMessageOfType`.
139 return (async function* (): AsyncGenerator<Event, undefined> {
140 try {
141 while (listening) {
142 const event = pendingEvents.shift();
143 if (event === undefined) {
144 yield new Promise<Event>((resolve, reject) => {
145 pendingPromises.push([resolve, reject]);
146 });
147 } else {
148 yield event;
149 }
150 }
151 } catch {
152 // ex. connection dropped.
153 } finally {
154 listener.dispose();
155 }
156 return undefined;
157 })();
158 }
159
160 /**
161 * Returns the next message in the stream of `type` that also matches the given predicate.
162 */
163 nextMessageMatching<T extends IncomingMessage['type']>(
164 type: T,
165 test: (message: IncomingMessage & {type: T}) => boolean,
166 ): Promise<IncomingMessage & {type: T}> {
167 const deferred = defer<IncomingMessage & {type: T}>();
168 let dispose: Disposable | null = this.onMessageOfType(type, message => {
169 if (test(message)) {
170 dispose?.dispose();
171 dispose = null;
172 deferred.resolve(message);
173 }
174 });
175
176 return deferred.promise;
177 }
178
179 postMessage(message: ClientToServerMessage) {
180 this.messageBus.postMessage(serializeToString(message));
181 if (debugLogMessageTraffic.shouldLog) {
182 // eslint-disable-next-line no-console
183 console.log('%c Outgoing ⮕ ', 'color:white;background-color:royalblue', message);
184 }
185 }
186
187 /**
188 * Call a callback when a connection is established, or reestablished after a disconnection.
189 */
190 onConnectOrReconnect(callback: () => (() => unknown) | unknown): () => void {
191 let reconnecting = true;
192 let disposeCallback: (() => unknown) | unknown = undefined;
193 const disposable = this.messageBus.onChangeStatus(newStatus => {
194 if (newStatus.type === 'reconnecting') {
195 reconnecting = true;
196 } else if (newStatus.type === 'open') {
197 if (reconnecting) {
198 disposeCallback = callback();
199 }
200 reconnecting = false;
201 }
202 });
203 return () => {
204 disposable.dispose();
205 typeof disposeCallback === 'function' && disposeCallback?.();
206 };
207 }
208
209 private cwdChangeHandlers: Array<() => unknown> = [];
210 onCwdChanged(cb: () => unknown) {
211 this.cwdChangeHandlers.push(cb);
212 return () => {
213 this.cwdChangeHandlers.splice(this.cwdChangeHandlers.indexOf(cb), 1);
214 };
215 }
216 cwdChanged() {
217 this.cwdChangeHandlers.forEach(handler => handler());
218 }
219
220 /**
221 * Call a callback when a connection is established, or reestablished after a disconnection,
222 * or the current working directory (and therefore usually repository) changes.
223 */
224 onSetup(cb: () => (() => unknown) | unknown): () => void {
225 const disposeConnectionSubscription = this.onConnectOrReconnect(cb);
226 const disposeCwdChange = this.onCwdChanged(cb);
227
228 return () => {
229 disposeConnectionSubscription();
230 disposeCwdChange();
231 };
232 }
233}
234
235const clientToServerAPI = new ClientToServerAPIImpl(platform.messageBus);
236
237declare global {
238 interface Window {
239 clientToServerAPI?: ClientToServerAPI;
240 }
241}
242window.clientToServerAPI = clientToServerAPI;
243
244export default clientToServerAPI;
245