From 5c21c3456806368ceecafb5dd10bd93f3bf88406 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 29 Jan 2024 15:17:55 -0800 Subject: [PATCH] Support telemetry batching and move WebSocket handling to worker (#7391) * Support subscription batching from API, Tables, and Plots * Added batching worker * Added configurable batch size and throttling rate * Support batch size based throttling * Default to latest strategy * Don't hide original error * Added copyright statement * Renamed BatchingWebSocketProvider to BatchingWebSocket * Adding docs * renamed class. changed throttling strategy to be driven by the main thread * Renamed classes * Added more documentation * Fixed broken tests * Addressed review comments * Clean up and reconnect on websocket close * Better management of subscription strategies * Add tests to catch edge cases where two subscribers request different strategies * Ensure callbacks are invoked with telemetry in the requested format * Remove console out. Oops * Fix linting errors --- .cspell.json | 3 +- src/api/telemetry/BatchingWebSocket.js | 194 ++++++++++ src/api/telemetry/TelemetryAPI.js | 123 ++++++- src/api/telemetry/TelemetryAPISpec.js | 132 ++++++- src/api/telemetry/TelemetryCollection.js | 5 +- src/api/telemetry/WebSocketWorker.js | 366 +++++++++++++++++++ src/plugins/plot/configuration/Model.js | 4 +- src/plugins/plot/configuration/PlotSeries.js | 47 ++- src/plugins/remoteClock/RemoteClock.js | 35 +- 9 files changed, 847 insertions(+), 62 deletions(-) create mode 100644 src/api/telemetry/BatchingWebSocket.js create mode 100644 src/api/telemetry/WebSocketWorker.js diff --git a/.cspell.json b/.cspell.json index 82e4d7df12..31df39a88e 100644 --- a/.cspell.json +++ b/.cspell.json @@ -493,7 +493,8 @@ "WCAG", "stackedplot", "Andale", - "checksnapshots" + "checksnapshots", + "specced" ], "dictionaries": ["npm", "softwareTerms", "node", "html", "css", "bash", "en_US"], "ignorePaths": [ diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js new file mode 100644 index 0000000000..204e6e9e70 --- /dev/null +++ b/src/api/telemetry/BatchingWebSocket.js @@ -0,0 +1,194 @@ +/***************************************************************************** + * Open MCT Web, Copyright (c) 2014-2024, United States Government + * as represented by the Administrator of the National Aeronautics and Space + * Administration. All rights reserved. + * + * Open MCT Web is licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * Open MCT Web includes source code licensed under additional open source + * licenses. See the Open Source Licenses file (LICENSES.md) included with + * this source code distribution or the Licensing information page available + * at runtime from the About dialog for additional information. + *****************************************************************************/ +import installWorker from './WebSocketWorker.js'; +const DEFAULT_RATE_MS = 1000; +/** + * Describes the strategy to be used when batching WebSocket messages + * + * @typedef BatchingStrategy + * @property {Function} shouldBatchMessage a function that accepts a single + * argument - the raw message received from the websocket. Every message + * received will be evaluated against this function so it should be performant. + * Note also that this function is executed in a worker, so it must be + * completely self-contained with no external dependencies. The function + * should return `true` if the message should be batched, and `false` if not. + * @property {Function} getBatchIdFromMessage a function that accepts a + * single argument - the raw message received from the websocket. Only messages + * where `shouldBatchMessage` has evaluated to true will be passed into this + * function. The function should return a unique value on which to batch the + * messages. For example a telemetry, channel, or parameter identifier. + */ +/** + * Provides a reliable and convenient WebSocket abstraction layer that handles + * a lot of boilerplate common to managing WebSocket connections such as: + * - Establishing a WebSocket connection to a server + * - Reconnecting on error, with a fallback strategy + * - Queuing messages so that clients can send messages without concern for the current + * connection state of the WebSocket. + * + * The WebSocket that it manages is based in a dedicated worker so that network + * concerns are not handled on the main event loop. This allows for performant receipt + * and batching of messages without blocking either the UI or server. + * + * @memberof module:openmct.telemetry + */ +class BatchingWebSocket extends EventTarget { + #worker; + #openmct; + #showingRateLimitNotification; + #rate; + + constructor(openmct) { + super(); + // Install worker, register listeners etc. + const workerFunction = `(${installWorker.toString()})()`; + const workerBlob = new Blob([workerFunction]); + const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' }); + this.#worker = new Worker(workerUrl); + this.#openmct = openmct; + this.#showingRateLimitNotification = false; + this.#rate = DEFAULT_RATE_MS; + + const routeMessageToHandler = this.#routeMessageToHandler.bind(this); + this.#worker.addEventListener('message', routeMessageToHandler); + openmct.on( + 'destroy', + () => { + this.disconnect(); + URL.revokeObjectURL(workerUrl); + }, + { once: true } + ); + } + + /** + * Will establish a WebSocket connection to the provided url + * @param {string} url The URL to connect to + */ + connect(url) { + this.#worker.postMessage({ + type: 'connect', + url + }); + + this.#readyForNextBatch(); + } + + #readyForNextBatch() { + this.#worker.postMessage({ + type: 'readyForNextBatch' + }); + } + + /** + * Send a message to the WebSocket. + * @param {any} message The message to send. Can be any type supported by WebSockets. + * See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data + */ + sendMessage(message) { + this.#worker.postMessage({ + type: 'message', + message + }); + } + + /** + * Set the strategy used to both decide which raw messages to batch, and how to group + * them. + * @param {BatchingStrategy} strategy The batching strategy to use when evaluating + * raw messages from the WebSocket. + */ + setBatchingStrategy(strategy) { + const serializedStrategy = { + shouldBatchMessage: strategy.shouldBatchMessage.toString(), + getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString() + }; + + this.#worker.postMessage({ + type: 'setBatchingStrategy', + serializedStrategy + }); + } + + /** + * When using batching, sets the rate at which batches of messages are released. + * @param {Number} rate the amount of time to wait, in ms, between batches. + */ + setRate(rate) { + this.#rate = rate; + } + + /** + * @param {Number} maxBatchSize the maximum length of a batch of messages. For example, + * the maximum number of telemetry values to batch before dropping them + * Note that this is a fail-safe that is only invoked if performance drops to the + * point where Open MCT cannot keep up with the amount of telemetry it is receiving. + * In this event it will sacrifice the oldest telemetry in the batch in favor of the + * most recent telemetry. The user will be informed that telemetry has been dropped. + * + * This should be set appropriately for the expected data rate. eg. If telemetry + * is received at 10Hz for each telemetry point, then a minimal combination of batch + * size and rate is 10 and 1000 respectively. Ideally you would add some margin, so + * 15 would probably be a better batch size. + */ + setMaxBatchSize(maxBatchSize) { + this.#worker.postMessage({ + type: 'setMaxBatchSize', + maxBatchSize + }); + } + + /** + * Disconnect the associated WebSocket. Generally speaking there is no need to call + * this manually. + */ + disconnect() { + this.#worker.postMessage({ + type: 'disconnect' + }); + } + + #routeMessageToHandler(message) { + if (message.data.type === 'batch') { + if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) { + const notification = this.#openmct.notifications.alert( + 'Telemetry dropped due to client rate limiting.', + { hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' } + ); + this.#showingRateLimitNotification = true; + notification.once('minimized', () => { + this.#showingRateLimitNotification = false; + }); + } + this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); + setTimeout(() => { + this.#readyForNextBatch(); + }, this.#rate); + } else if (message.data.type === 'message') { + this.dispatchEvent(new CustomEvent('message', { detail: message.data.message })); + } else { + throw new Error(`Unknown message type: ${message.data.type}`); + } + } +} + +export default BatchingWebSocket; diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index d29bf231a9..e6aa50bda6 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -23,6 +23,7 @@ import objectUtils from 'objectUtils'; import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js'; +import BatchingWebSocket from './BatchingWebSocket.js'; import DefaultMetadataProvider from './DefaultMetadataProvider.js'; import TelemetryCollection from './TelemetryCollection.js'; import TelemetryMetadataManager from './TelemetryMetadataManager.js'; @@ -54,6 +55,28 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js'; * @memberof module:openmct.TelemetryAPI~ */ +/** + * Describes and bounds requests for telemetry data. + * + * @typedef TelemetrySubscriptionOptions + * @property {String} [strategy] symbolic identifier directing providers on how + * to handle telemetry subscriptions. The default behavior is 'latest' which will + * always return a single telemetry value with each callback, and in the event + * of throttling will always prioritize the latest data, meaning intermediate + * data will be skipped. Alternatively, the `batch` strategy can be used, which + * will return all telemetry values since the last callback. This strategy is + * useful for cases where intermediate data is important, such as when + * rendering a telemetry plot or table. If `batch` is specified, the subscription + * callback will be invoked with an Array. + * + * @memberof module:openmct.TelemetryAPI~ + */ + +const SUBSCRIBE_STRATEGY = { + LATEST: 'latest', + BATCH: 'batch' +}; + /** * Utilities for telemetry * @interface TelemetryAPI @@ -61,6 +84,11 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js'; */ export default class TelemetryAPI { #isGreedyLAD; + #subscribeCache; + + get SUBSCRIBE_STRATEGY() { + return SUBSCRIBE_STRATEGY; + } constructor(openmct) { this.openmct = openmct; @@ -78,6 +106,8 @@ export default class TelemetryAPI { this.valueFormatterCache = new WeakMap(); this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry(); this.#isGreedyLAD = true; + this.BatchingWebSocket = BatchingWebSocket; + this.#subscribeCache = {}; } abortAllRequests() { @@ -378,54 +408,111 @@ export default class TelemetryAPI { * @memberof module:openmct.TelemetryAPI~TelemetryProvider# * @param {module:openmct.DomainObject} domainObject the object * which has associated telemetry - * @param {TelemetryRequestOptions} options configuration items for subscription + * @param {TelemetrySubscriptionOptions} options configuration items for subscription * @param {Function} callback the callback to invoke with new data, as * it becomes available * @returns {Function} a function which may be called to terminate * the subscription */ - subscribe(domainObject, callback, options) { + subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) { + const requestedStrategy = options.strategy || SUBSCRIBE_STRATEGY.LATEST; + if (domainObject.type === 'unknown') { return () => {}; } - const provider = this.findSubscriptionProvider(domainObject); + const provider = this.findSubscriptionProvider(domainObject, options); + const supportsBatching = + Boolean(provider?.supportsBatching) && provider?.supportsBatching(domainObject, options); - if (!this.subscribeCache) { - this.subscribeCache = {}; + if (!this.#subscribeCache) { + this.#subscribeCache = {}; } const keyString = objectUtils.makeKeyString(domainObject.identifier); - let subscriber = this.subscribeCache[keyString]; + const supportedStrategy = supportsBatching ? requestedStrategy : SUBSCRIBE_STRATEGY.LATEST; + // Override the requested strategy with the strategy supported by the provider + const optionsWithSupportedStrategy = { + ...options, + strategy: supportedStrategy + }; + // If batching is supported, we need to cache a subscription for each strategy - + // latest and batched. + const cacheKey = `${keyString}:${supportedStrategy}`; + let subscriber = this.#subscribeCache[cacheKey]; if (!subscriber) { - subscriber = this.subscribeCache[keyString] = { - callbacks: [callback] + subscriber = this.#subscribeCache[cacheKey] = { + latestCallbacks: [], + batchCallbacks: [] }; if (provider) { subscriber.unsubscribe = provider.subscribe( domainObject, - function (value) { - subscriber.callbacks.forEach(function (cb) { - cb(value); - }); - }, - options + invokeCallbackWithRequestedStrategy, + optionsWithSupportedStrategy ); } else { subscriber.unsubscribe = function () {}; } + } + + if (requestedStrategy === SUBSCRIBE_STRATEGY.BATCH) { + subscriber.batchCallbacks.push(callback); } else { - subscriber.callbacks.push(callback); + subscriber.latestCallbacks.push(callback); + } + + // Guarantees that view receive telemetry in the expected form + function invokeCallbackWithRequestedStrategy(data) { + invokeCallbacksWithArray(data, subscriber.batchCallbacks); + invokeCallbacksWithSingleValue(data, subscriber.latestCallbacks); + } + + function invokeCallbacksWithArray(data, batchCallbacks) { + // + if (data === undefined || data === null || data.length === 0) { + throw new Error( + 'Attempt to invoke telemetry subscription callback with no telemetry datum' + ); + } + + if (!Array.isArray(data)) { + data = [data]; + } + + batchCallbacks.forEach((cb) => { + cb(data); + }); + } + + function invokeCallbacksWithSingleValue(data, latestCallbacks) { + if (Array.isArray(data)) { + data = data[data.length - 1]; + } + + if (data === undefined || data === null) { + throw new Error( + 'Attempt to invoke telemetry subscription callback with no telemetry datum' + ); + } + + latestCallbacks.forEach((cb) => { + cb(data); + }); } return function unsubscribe() { - subscriber.callbacks = subscriber.callbacks.filter(function (cb) { + subscriber.latestCallbacks = subscriber.latestCallbacks.filter(function (cb) { return cb !== callback; }); - if (subscriber.callbacks.length === 0) { + subscriber.batchCallbacks = subscriber.batchCallbacks.filter(function (cb) { + return cb !== callback; + }); + + if (subscriber.latestCallbacks.length === 0 && subscriber.batchCallbacks.length === 0) { subscriber.unsubscribe(); - delete this.subscribeCache[keyString]; + delete this.#subscribeCache[cacheKey]; } }.bind(this); } diff --git a/src/api/telemetry/TelemetryAPISpec.js b/src/api/telemetry/TelemetryAPISpec.js index 59ff14b3bf..5cf221de51 100644 --- a/src/api/telemetry/TelemetryAPISpec.js +++ b/src/api/telemetry/TelemetryAPISpec.js @@ -90,7 +90,9 @@ describe('Telemetry API', () => { const callback = jasmine.createSpy('callback'); const unsubscribe = telemetryAPI.subscribe(domainObject, callback); - expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject); + expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, { + strategy: 'latest' + }); expect(telemetryProvider.subscribe).not.toHaveBeenCalled(); expect(unsubscribe).toEqual(jasmine.any(Function)); @@ -111,12 +113,16 @@ describe('Telemetry API', () => { const callback = jasmine.createSpy('callback'); const unsubscribe = telemetryAPI.subscribe(domainObject, callback); expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1); - expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject); + expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, { + strategy: 'latest' + }); expect(telemetryProvider.subscribe.calls.count()).toBe(1); expect(telemetryProvider.subscribe).toHaveBeenCalledWith( domainObject, jasmine.any(Function), - undefined + { + strategy: 'latest' + } ); const notify = telemetryProvider.subscribe.calls.mostRecent().args[1]; @@ -321,6 +327,126 @@ describe('Telemetry API', () => { signal }); }); + describe('telemetry batching support', () => { + let callbacks; + let unsubFunc; + + beforeEach(() => { + callbacks = []; + unsubFunc = jasmine.createSpy('unsubscribe'); + telemetryProvider.supportsBatching = jasmine.createSpy('supportsBatching'); + telemetryProvider.supportsBatching.and.returnValue(true); + telemetryProvider.supportsSubscribe.and.returnValue(true); + + telemetryProvider.subscribe.and.callFake(function (obj, cb, options) { + callbacks.push(cb); + + return unsubFunc; + }); + + telemetryAPI.addProvider(telemetryProvider); + }); + + it('caches subscriptions for batched and latest telemetry subscriptions', () => { + const latestCallback1 = jasmine.createSpy('latestCallback1'); + const unsubscribeFromLatest1 = telemetryAPI.subscribe(domainObject, latestCallback1, { + strategy: 'latest' + }); + const latestCallback2 = jasmine.createSpy('latestCallback2'); + const unsubscribeFromLatest2 = telemetryAPI.subscribe(domainObject, latestCallback2, { + strategy: 'latest' + }); + + //Expect a single cached subscription for latest telemetry + expect(telemetryProvider.subscribe.calls.count()).toBe(1); + + const batchedCallback1 = jasmine.createSpy('batchedCallback1'); + const unsubscribeFromBatched1 = telemetryAPI.subscribe(domainObject, batchedCallback1, { + strategy: 'batch' + }); + + const batchedCallback2 = jasmine.createSpy('batchedCallback2'); + const unsubscribeFromBatched2 = telemetryAPI.subscribe(domainObject, batchedCallback2, { + strategy: 'batch' + }); + + //Expect a single cached subscription for each strategy telemetry + expect(telemetryProvider.subscribe.calls.count()).toBe(2); + + unsubscribeFromLatest1(); + unsubscribeFromLatest2(); + unsubscribeFromBatched1(); + unsubscribeFromBatched2(); + + expect(unsubFunc).toHaveBeenCalledTimes(2); + }); + it('subscriptions with the latest strategy are always invoked with a single value', () => { + const latestCallback = jasmine.createSpy('latestCallback1'); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); + + const batchedValues = [1, 2, 3]; + callbacks.forEach((cb) => { + cb(batchedValues); + }); + + expect(latestCallback).toHaveBeenCalledWith(3); + + const singleValue = 1; + callbacks.forEach((cb) => { + cb(singleValue); + }); + + expect(latestCallback).toHaveBeenCalledWith(1); + }); + + it('subscriptions with the batch strategy are always invoked with an array', () => { + const batchedCallback = jasmine.createSpy('batchedCallback1'); + const latestCallback = jasmine.createSpy('latestCallback1'); + telemetryAPI.subscribe(domainObject, batchedCallback, { + strategy: 'batch' + }); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); + + const batchedValues = [1, 2, 3]; + callbacks.forEach((cb) => { + cb(batchedValues); + }); + + // Callbacks for the 'batch' strategy are always called with an array of values + expect(batchedCallback).toHaveBeenCalledWith(batchedValues); + // Callbacks for the 'latest' strategy are always called with a single value + expect(latestCallback).toHaveBeenCalledWith(3); + + callbacks.forEach((cb) => { + cb(1); + }); + // Callbacks for the 'batch' strategy are always called with an array of values, even if there is only one value + expect(batchedCallback).toHaveBeenCalledWith([1]); + // Callbacks for the 'latest' strategy are always called with a single value + expect(latestCallback).toHaveBeenCalledWith(1); + }); + + it('legacy providers are left unchanged, with a single subscription', () => { + delete telemetryProvider.supportsBatching; + + const batchCallback = jasmine.createSpy('batchCallback'); + telemetryAPI.subscribe(domainObject, batchCallback, { + strategy: 'batch' + }); + expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest'); + + const latestCallback = jasmine.createSpy('latestCallback'); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); + + expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest'); + }); + }); }); describe('metadata', () => { diff --git a/src/api/telemetry/TelemetryCollection.js b/src/api/telemetry/TelemetryCollection.js index 831c9288c1..4791a04ae0 100644 --- a/src/api/telemetry/TelemetryCollection.js +++ b/src/api/telemetry/TelemetryCollection.js @@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter { if (this.unsubscribe) { this.unsubscribe(); } + const options = { ...this.options }; + //We always want to receive all available values in telemetry tables. + options.strategy = this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH; this.unsubscribe = this.openmct.telemetry.subscribe( this.domainObject, (datum) => this._processNewTelemetry(datum), - this.options + options ); } diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js new file mode 100644 index 0000000000..f09e7d12f7 --- /dev/null +++ b/src/api/telemetry/WebSocketWorker.js @@ -0,0 +1,366 @@ +/***************************************************************************** + * Open MCT Web, Copyright (c) 2014-2024, United States Government + * as represented by the Administrator of the National Aeronautics and Space + * Administration. All rights reserved. + * + * Open MCT Web is licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * Open MCT Web includes source code licensed under additional open source + * licenses. See the Open Source Licenses file (LICENSES.md) included with + * this source code distribution or the Licensing information page available + * at runtime from the About dialog for additional information. + *****************************************************************************/ +/* eslint-disable max-classes-per-file */ +export default function installWorker() { + const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; + + /** + * @typedef {import('./BatchingWebSocket').BatchingStrategy} BatchingStrategy + */ + + /** + * Provides a WebSocket connection that is resilient to errors and dropouts. + * On an error or dropout, will automatically reconnect. + * + * Additionally, messages will be queued and sent only when WebSocket is + * connected meaning that client code does not need to check the state of + * the socket before sending. + */ + class ResilientWebSocket extends EventTarget { + #webSocket; + #isConnected = false; + #isConnecting = false; + #messageQueue = []; + #reconnectTimeoutHandle; + #currentWaitIndex = 0; + #messageCallbacks = []; + #wsUrl; + + /** + * Establish a new WebSocket connection to the given URL + * @param {String} url + */ + connect(url) { + this.#wsUrl = url; + if (this.#isConnected) { + throw new Error('WebSocket already connected'); + } + + if (this.#isConnecting) { + throw new Error('WebSocket connection in progress'); + } + + this.#isConnecting = true; + + this.#webSocket = new WebSocket(url); + + const boundConnected = this.#connected.bind(this); + this.#webSocket.addEventListener('open', boundConnected); + + const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this); + this.#webSocket.addEventListener('error', boundCleanUpAndReconnect); + this.#webSocket.addEventListener('close', boundCleanUpAndReconnect); + + const boundMessage = this.#message.bind(this); + this.#webSocket.addEventListener('message', boundMessage); + + this.addEventListener( + 'disconnected', + () => { + this.#webSocket.removeEventListener('open', boundConnected); + this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect); + this.#webSocket.removeEventListener('close', boundCleanUpAndReconnect); + }, + { once: true } + ); + } + + /** + * Register a callback to be invoked when a message is received on the WebSocket. + * This paradigm is used instead of the standard EventTarget or EventEmitter approach + * for performance reasons. + * @param {Function} callback The function to be invoked when a message is received + * @returns an unregister function + */ + registerMessageCallback(callback) { + this.#messageCallbacks.push(callback); + + return () => { + this.#messageCallbacks = this.#messageCallbacks.filter((cb) => cb !== callback); + }; + } + + #connected() { + console.debug('Websocket connected.'); + this.#isConnected = true; + this.#isConnecting = false; + this.#currentWaitIndex = 0; + + this.dispatchEvent(new Event('connected')); + + this.#flushQueue(); + } + + #cleanUpAndReconnect() { + console.warn('Websocket closed. Attempting to reconnect...'); + this.disconnect(); + this.#reconnect(); + } + + #message(event) { + this.#messageCallbacks.forEach((callback) => callback(event.data)); + } + + disconnect() { + this.#isConnected = false; + this.#isConnecting = false; + + // On WebSocket error, both error callback and close callback are invoked, resulting in + // this function being called twice, and websocket being destroyed and deallocated. + if (this.#webSocket !== undefined && this.#webSocket !== null) { + this.#webSocket.close(); + } + + this.dispatchEvent(new Event('disconnected')); + this.#webSocket = undefined; + } + + #reconnect() { + if (this.#reconnectTimeoutHandle) { + return; + } + + this.#reconnectTimeoutHandle = setTimeout(() => { + this.connect(this.#wsUrl); + + this.#reconnectTimeoutHandle = undefined; + }, FALLBACK_AND_WAIT_MS[this.#currentWaitIndex]); + + if (this.#currentWaitIndex < FALLBACK_AND_WAIT_MS.length - 1) { + this.#currentWaitIndex++; + } + } + + enqueueMessage(message) { + this.#messageQueue.push(message); + this.#flushQueueIfReady(); + } + + #flushQueueIfReady() { + if (this.#isConnected) { + this.#flushQueue(); + } + } + + #flushQueue() { + while (this.#messageQueue.length > 0) { + if (!this.#isConnected) { + break; + } + + const message = this.#messageQueue.shift(); + this.#webSocket.send(message); + } + } + } + + /** + * Handles messages over the worker interface, and + * sends corresponding WebSocket messages. + */ + class WorkerToWebSocketMessageBroker { + #websocket; + #messageBatcher; + + constructor(websocket, messageBatcher) { + this.#websocket = websocket; + this.#messageBatcher = messageBatcher; + } + + routeMessageToHandler(message) { + const { type } = message.data; + switch (type) { + case 'connect': + this.connect(message); + break; + case 'disconnect': + this.disconnect(message); + break; + case 'message': + this.#websocket.enqueueMessage(message.data.message); + break; + case 'setBatchingStrategy': + this.setBatchingStrategy(message); + break; + case 'readyForNextBatch': + this.#messageBatcher.readyForNextBatch(); + break; + case 'setMaxBatchSize': + this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize); + break; + default: + throw new Error(`Unknown message type: ${type}`); + } + } + connect(message) { + const { url } = message.data; + this.#websocket.connect(url); + } + disconnect() { + this.#websocket.disconnect(); + } + setBatchingStrategy(message) { + const { serializedStrategy } = message.data; + const batchingStrategy = { + // eslint-disable-next-line no-new-func + shouldBatchMessage: new Function(`return ${serializedStrategy.shouldBatchMessage}`)(), + // eslint-disable-next-line no-new-func + getBatchIdFromMessage: new Function(`return ${serializedStrategy.getBatchIdFromMessage}`)() + // Will also include maximum batch length here + }; + this.#messageBatcher.setBatchingStrategy(batchingStrategy); + } + } + + /** + * Received messages from the WebSocket, and passes them along to the + * Worker interface and back to the main thread. + */ + class WebSocketToWorkerMessageBroker { + #worker; + #messageBatcher; + + constructor(messageBatcher, worker) { + this.#messageBatcher = messageBatcher; + this.#worker = worker; + } + + routeMessageToHandler(data) { + //Implement batching here + if (this.#messageBatcher.shouldBatchMessage(data)) { + this.#messageBatcher.addMessageToBatch(data); + } else { + this.#worker.postMessage({ + type: 'message', + message: data + }); + } + } + } + + /** + * Responsible for batching messages according to the defined batching strategy. + */ + class MessageBatcher { + #batch; + #batchingStrategy; + #hasBatch = false; + #maxBatchSize; + #readyForNextBatch; + #worker; + + constructor(worker) { + this.#maxBatchSize = 10; + this.#readyForNextBatch = false; + this.#worker = worker; + this.#resetBatch(); + } + #resetBatch() { + this.#batch = {}; + this.#hasBatch = false; + } + /** + * @param {BatchingStrategy} strategy + */ + setBatchingStrategy(strategy) { + this.#batchingStrategy = strategy; + } + /** + * Applies the `shouldBatchMessage` function from the supplied batching strategy + * to each message to determine if it should be added to a batch. If not batched, + * the message is immediately sent over the worker to the main thread. + * @param {any} message the message received from the WebSocket. See the WebSocket + * documentation for more details - + * https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data + * @returns + */ + shouldBatchMessage(message) { + return ( + this.#batchingStrategy.shouldBatchMessage && + this.#batchingStrategy.shouldBatchMessage(message) + ); + } + /** + * Adds the given message to a batch. The batch group that the message is added + * to will be determined by the value returned by `getBatchIdFromMessage`. + * @param {any} message the message received from the WebSocket. See the WebSocket + * documentation for more details - + * https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data + */ + addMessageToBatch(message) { + const batchId = this.#batchingStrategy.getBatchIdFromMessage(message); + let batch = this.#batch[batchId]; + if (batch === undefined) { + batch = this.#batch[batchId] = [message]; + } else { + batch.push(message); + } + if (batch.length > this.#maxBatchSize) { + batch.shift(); + this.#batch.dropped = this.#batch.dropped || true; + } + if (this.#readyForNextBatch) { + this.#sendNextBatch(); + } else { + this.#hasBatch = true; + } + } + setMaxBatchSize(maxBatchSize) { + this.#maxBatchSize = maxBatchSize; + } + /** + * Indicates that client code is ready to receive the next batch of + * messages. If a batch is available, it will be immediately sent. + * Otherwise a flag will be set to send the next batch as soon as + * any new data is available. + */ + readyForNextBatch() { + if (this.#hasBatch) { + this.#sendNextBatch(); + } else { + this.#readyForNextBatch = true; + } + } + #sendNextBatch() { + const batch = this.#batch; + this.#resetBatch(); + this.#worker.postMessage({ + type: 'batch', + batch + }); + this.#readyForNextBatch = false; + this.#hasBatch = false; + } + } + + const websocket = new ResilientWebSocket(); + const messageBatcher = new MessageBatcher(self); + const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); + const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self); + + self.addEventListener('message', (message) => { + workerBroker.routeMessageToHandler(message); + }); + websocket.registerMessageCallback((data) => { + websocketBroker.routeMessageToHandler(data); + }); +} diff --git a/src/plugins/plot/configuration/Model.js b/src/plugins/plot/configuration/Model.js index 390c043228..2a698c4c64 100644 --- a/src/plugins/plot/configuration/Model.js +++ b/src/plugins/plot/configuration/Model.js @@ -139,7 +139,9 @@ export default class Model extends EventEmitter { /** @typedef {any} TODO */ -/** @typedef {TODO} OpenMCT */ +/** + * @typedef {import('../../../../openmct.js').OpenMCT} OpenMCT + */ /** @template {object} T diff --git a/src/plugins/plot/configuration/PlotSeries.js b/src/plugins/plot/configuration/PlotSeries.js index 2d609e1be5..fdc26d277e 100644 --- a/src/plugins/plot/configuration/PlotSeries.js +++ b/src/plugins/plot/configuration/PlotSeries.js @@ -211,9 +211,16 @@ export default class PlotSeries extends Model { ); if (!this.unsubscribe) { - this.unsubscribe = this.openmct.telemetry.subscribe(this.domainObject, this.add.bind(this), { - filters: this.filters - }); + this.unsubscribe = this.openmct.telemetry.subscribe( + this.domainObject, + (data) => { + this.addAll(data, true); + }, + { + filters: this.filters, + strategy: this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH + } + ); } try { @@ -302,9 +309,7 @@ export default class PlotSeries extends Model { this.resetStats(); this.emit('reset'); if (newData) { - newData.forEach(function (point) { - this.add(point, true); - }, this); + this.addAll(newData, true); } } /** @@ -416,14 +421,14 @@ export default class PlotSeries extends Model { * when adding an array of points that are already properly sorted. * * @private - * @param {Object} point a telemetry datum. - * @param {Boolean} [appendOnly] default false, if true will append + * @param {Object} newData a telemetry datum. + * @param {Boolean} [sorted] default false, if true will append * a point to the end without dupe checking. */ - add(point, appendOnly) { + add(newData, sorted = false) { let data = this.getSeriesData(); let insertIndex = data.length; - const currentYVal = this.getYVal(point); + const currentYVal = this.getYVal(newData); const lastYVal = this.getYVal(data[insertIndex - 1]); if (this.isValueInvalid(currentYVal) && this.isValueInvalid(lastYVal)) { @@ -432,22 +437,28 @@ export default class PlotSeries extends Model { return; } - if (!appendOnly) { - insertIndex = this.sortedIndex(point); - if (this.getXVal(data[insertIndex]) === this.getXVal(point)) { + if (!sorted) { + insertIndex = this.sortedIndex(newData); + if (this.getXVal(data[insertIndex]) === this.getXVal(newData)) { return; } - if (this.getXVal(data[insertIndex - 1]) === this.getXVal(point)) { + if (this.getXVal(data[insertIndex - 1]) === this.getXVal(newData)) { return; } } - this.updateStats(point); - point.mctLimitState = this.evaluate(point); - data.splice(insertIndex, 0, point); + this.updateStats(newData); + newData.mctLimitState = this.evaluate(newData); + data.splice(insertIndex, 0, newData); this.updateSeriesData(data); - this.emit('add', point, insertIndex, this); + this.emit('add', newData, insertIndex, this); + } + + addAll(points, sorted = false) { + for (let i = 0; i < points.length; i++) { + this.add(points[i], sorted); + } } /** diff --git a/src/plugins/remoteClock/RemoteClock.js b/src/plugins/remoteClock/RemoteClock.js index 6bffea1513..db50e4b2e2 100644 --- a/src/plugins/remoteClock/RemoteClock.js +++ b/src/plugins/remoteClock/RemoteClock.js @@ -59,26 +59,21 @@ export default class RemoteClock extends DefaultClock { } start() { - this.openmct.objects - .get(this.identifier) - .then((domainObject) => { - // The start method is called when at least one listener registers with the clock. - // When the clock is changed, listeners are unregistered from the clock and the stop method is called. - // Sometimes, the objects.get call above does not resolve before the stop method is called. - // So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock. - if (this.eventNames().length === 0) { - return; - } - this.openmct.time.on('timeSystem', this._timeSystemChange); - this.timeTelemetryObject = domainObject; - this.metadata = this.openmct.telemetry.getMetadata(domainObject); - this._timeSystemChange(); - this._requestLatest(); - this._subscribe(); - }) - .catch((error) => { - throw new Error(error); - }); + this.openmct.objects.get(this.identifier).then((domainObject) => { + // The start method is called when at least one listener registers with the clock. + // When the clock is changed, listeners are unregistered from the clock and the stop method is called. + // Sometimes, the objects.get call above does not resolve before the stop method is called. + // So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock. + if (this.eventNames().length === 0) { + return; + } + this.openmct.time.on('timeSystem', this._timeSystemChange); + this.timeTelemetryObject = domainObject; + this.metadata = this.openmct.telemetry.getMetadata(domainObject); + this._timeSystemChange(); + this._requestLatest(); + this._subscribe(); + }); } stop() {