Support telemetry batching and move WebSocket handling to worker (#7391)

* Support subscription batching from API, Tables, and Plots

* Added batching worker

* Added configurable batch size and throttling rate

* Support batch size based throttling

* Default to latest strategy

* Don't hide original error

* Added copyright statement

* Renamed BatchingWebSocketProvider to BatchingWebSocket

* Adding docs

* renamed class. changed throttling strategy to be driven by the main thread

* Renamed classes

* Added more documentation

* Fixed broken tests

* Addressed review comments

* Clean up and reconnect on websocket close

* Better management of subscription strategies

* Add tests to catch edge cases where two subscribers request different strategies

* Ensure callbacks are invoked with telemetry in the requested format

* Remove console out. Oops

* Fix linting errors
This commit is contained in:
Andrew Henry 2024-01-29 15:17:55 -08:00 committed by GitHub
parent 0eea2e0bbc
commit 5c21c34568
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 847 additions and 62 deletions

View File

@ -493,7 +493,8 @@
"WCAG",
"stackedplot",
"Andale",
"checksnapshots"
"checksnapshots",
"specced"
],
"dictionaries": ["npm", "softwareTerms", "node", "html", "css", "bash", "en_US"],
"ignorePaths": [

View File

@ -0,0 +1,194 @@
/*****************************************************************************
* Open MCT Web, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT Web includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import installWorker from './WebSocketWorker.js';
const DEFAULT_RATE_MS = 1000;
/**
* Describes the strategy to be used when batching WebSocket messages
*
* @typedef BatchingStrategy
* @property {Function} shouldBatchMessage a function that accepts a single
* argument - the raw message received from the websocket. Every message
* received will be evaluated against this function so it should be performant.
* Note also that this function is executed in a worker, so it must be
* completely self-contained with no external dependencies. The function
* should return `true` if the message should be batched, and `false` if not.
* @property {Function} getBatchIdFromMessage a function that accepts a
* single argument - the raw message received from the websocket. Only messages
* where `shouldBatchMessage` has evaluated to true will be passed into this
* function. The function should return a unique value on which to batch the
* messages. For example a telemetry, channel, or parameter identifier.
*/
/**
* Provides a reliable and convenient WebSocket abstraction layer that handles
* a lot of boilerplate common to managing WebSocket connections such as:
* - Establishing a WebSocket connection to a server
* - Reconnecting on error, with a fallback strategy
* - Queuing messages so that clients can send messages without concern for the current
* connection state of the WebSocket.
*
* The WebSocket that it manages is based in a dedicated worker so that network
* concerns are not handled on the main event loop. This allows for performant receipt
* and batching of messages without blocking either the UI or server.
*
* @memberof module:openmct.telemetry
*/
class BatchingWebSocket extends EventTarget {
#worker;
#openmct;
#showingRateLimitNotification;
#rate;
constructor(openmct) {
super();
// Install worker, register listeners etc.
const workerFunction = `(${installWorker.toString()})()`;
const workerBlob = new Blob([workerFunction]);
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
this.#worker = new Worker(workerUrl);
this.#openmct = openmct;
this.#showingRateLimitNotification = false;
this.#rate = DEFAULT_RATE_MS;
const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
this.#worker.addEventListener('message', routeMessageToHandler);
openmct.on(
'destroy',
() => {
this.disconnect();
URL.revokeObjectURL(workerUrl);
},
{ once: true }
);
}
/**
* Will establish a WebSocket connection to the provided url
* @param {string} url The URL to connect to
*/
connect(url) {
this.#worker.postMessage({
type: 'connect',
url
});
this.#readyForNextBatch();
}
#readyForNextBatch() {
this.#worker.postMessage({
type: 'readyForNextBatch'
});
}
/**
* Send a message to the WebSocket.
* @param {any} message The message to send. Can be any type supported by WebSockets.
* See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data
*/
sendMessage(message) {
this.#worker.postMessage({
type: 'message',
message
});
}
/**
* Set the strategy used to both decide which raw messages to batch, and how to group
* them.
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
* raw messages from the WebSocket.
*/
setBatchingStrategy(strategy) {
const serializedStrategy = {
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};
this.#worker.postMessage({
type: 'setBatchingStrategy',
serializedStrategy
});
}
/**
* When using batching, sets the rate at which batches of messages are released.
* @param {Number} rate the amount of time to wait, in ms, between batches.
*/
setRate(rate) {
this.#rate = rate;
}
/**
* @param {Number} maxBatchSize the maximum length of a batch of messages. For example,
* the maximum number of telemetry values to batch before dropping them
* Note that this is a fail-safe that is only invoked if performance drops to the
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
* most recent telemetry. The user will be informed that telemetry has been dropped.
*
* This should be set appropriately for the expected data rate. eg. If telemetry
* is received at 10Hz for each telemetry point, then a minimal combination of batch
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
* 15 would probably be a better batch size.
*/
setMaxBatchSize(maxBatchSize) {
this.#worker.postMessage({
type: 'setMaxBatchSize',
maxBatchSize
});
}
/**
* Disconnect the associated WebSocket. Generally speaking there is no need to call
* this manually.
*/
disconnect() {
this.#worker.postMessage({
type: 'disconnect'
});
}
#routeMessageToHandler(message) {
if (message.data.type === 'batch') {
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
const notification = this.#openmct.notifications.alert(
'Telemetry dropped due to client rate limiting.',
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
);
this.#showingRateLimitNotification = true;
notification.once('minimized', () => {
this.#showingRateLimitNotification = false;
});
}
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
setTimeout(() => {
this.#readyForNextBatch();
}, this.#rate);
} else if (message.data.type === 'message') {
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));
} else {
throw new Error(`Unknown message type: ${message.data.type}`);
}
}
}
export default BatchingWebSocket;

View File

@ -23,6 +23,7 @@
import objectUtils from 'objectUtils';
import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
import BatchingWebSocket from './BatchingWebSocket.js';
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
import TelemetryCollection from './TelemetryCollection.js';
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
@ -54,6 +55,28 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js';
* @memberof module:openmct.TelemetryAPI~
*/
/**
* Describes and bounds requests for telemetry data.
*
* @typedef TelemetrySubscriptionOptions
* @property {String} [strategy] symbolic identifier directing providers on how
* to handle telemetry subscriptions. The default behavior is 'latest' which will
* always return a single telemetry value with each callback, and in the event
* of throttling will always prioritize the latest data, meaning intermediate
* data will be skipped. Alternatively, the `batch` strategy can be used, which
* will return all telemetry values since the last callback. This strategy is
* useful for cases where intermediate data is important, such as when
* rendering a telemetry plot or table. If `batch` is specified, the subscription
* callback will be invoked with an Array.
*
* @memberof module:openmct.TelemetryAPI~
*/
const SUBSCRIBE_STRATEGY = {
LATEST: 'latest',
BATCH: 'batch'
};
/**
* Utilities for telemetry
* @interface TelemetryAPI
@ -61,6 +84,11 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js';
*/
export default class TelemetryAPI {
#isGreedyLAD;
#subscribeCache;
get SUBSCRIBE_STRATEGY() {
return SUBSCRIBE_STRATEGY;
}
constructor(openmct) {
this.openmct = openmct;
@ -78,6 +106,8 @@ export default class TelemetryAPI {
this.valueFormatterCache = new WeakMap();
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
this.#isGreedyLAD = true;
this.BatchingWebSocket = BatchingWebSocket;
this.#subscribeCache = {};
}
abortAllRequests() {
@ -378,54 +408,111 @@ export default class TelemetryAPI {
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
* @param {module:openmct.DomainObject} domainObject the object
* which has associated telemetry
* @param {TelemetryRequestOptions} options configuration items for subscription
* @param {TelemetrySubscriptionOptions} options configuration items for subscription
* @param {Function} callback the callback to invoke with new data, as
* it becomes available
* @returns {Function} a function which may be called to terminate
* the subscription
*/
subscribe(domainObject, callback, options) {
subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) {
const requestedStrategy = options.strategy || SUBSCRIBE_STRATEGY.LATEST;
if (domainObject.type === 'unknown') {
return () => {};
}
const provider = this.findSubscriptionProvider(domainObject);
const provider = this.findSubscriptionProvider(domainObject, options);
const supportsBatching =
Boolean(provider?.supportsBatching) && provider?.supportsBatching(domainObject, options);
if (!this.subscribeCache) {
this.subscribeCache = {};
if (!this.#subscribeCache) {
this.#subscribeCache = {};
}
const keyString = objectUtils.makeKeyString(domainObject.identifier);
let subscriber = this.subscribeCache[keyString];
const supportedStrategy = supportsBatching ? requestedStrategy : SUBSCRIBE_STRATEGY.LATEST;
// Override the requested strategy with the strategy supported by the provider
const optionsWithSupportedStrategy = {
...options,
strategy: supportedStrategy
};
// If batching is supported, we need to cache a subscription for each strategy -
// latest and batched.
const cacheKey = `${keyString}:${supportedStrategy}`;
let subscriber = this.#subscribeCache[cacheKey];
if (!subscriber) {
subscriber = this.subscribeCache[keyString] = {
callbacks: [callback]
subscriber = this.#subscribeCache[cacheKey] = {
latestCallbacks: [],
batchCallbacks: []
};
if (provider) {
subscriber.unsubscribe = provider.subscribe(
domainObject,
function (value) {
subscriber.callbacks.forEach(function (cb) {
cb(value);
});
},
options
invokeCallbackWithRequestedStrategy,
optionsWithSupportedStrategy
);
} else {
subscriber.unsubscribe = function () {};
}
}
if (requestedStrategy === SUBSCRIBE_STRATEGY.BATCH) {
subscriber.batchCallbacks.push(callback);
} else {
subscriber.callbacks.push(callback);
subscriber.latestCallbacks.push(callback);
}
// Guarantees that view receive telemetry in the expected form
function invokeCallbackWithRequestedStrategy(data) {
invokeCallbacksWithArray(data, subscriber.batchCallbacks);
invokeCallbacksWithSingleValue(data, subscriber.latestCallbacks);
}
function invokeCallbacksWithArray(data, batchCallbacks) {
//
if (data === undefined || data === null || data.length === 0) {
throw new Error(
'Attempt to invoke telemetry subscription callback with no telemetry datum'
);
}
if (!Array.isArray(data)) {
data = [data];
}
batchCallbacks.forEach((cb) => {
cb(data);
});
}
function invokeCallbacksWithSingleValue(data, latestCallbacks) {
if (Array.isArray(data)) {
data = data[data.length - 1];
}
if (data === undefined || data === null) {
throw new Error(
'Attempt to invoke telemetry subscription callback with no telemetry datum'
);
}
latestCallbacks.forEach((cb) => {
cb(data);
});
}
return function unsubscribe() {
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
subscriber.latestCallbacks = subscriber.latestCallbacks.filter(function (cb) {
return cb !== callback;
});
if (subscriber.callbacks.length === 0) {
subscriber.batchCallbacks = subscriber.batchCallbacks.filter(function (cb) {
return cb !== callback;
});
if (subscriber.latestCallbacks.length === 0 && subscriber.batchCallbacks.length === 0) {
subscriber.unsubscribe();
delete this.subscribeCache[keyString];
delete this.#subscribeCache[cacheKey];
}
}.bind(this);
}

View File

@ -90,7 +90,9 @@ describe('Telemetry API', () => {
const callback = jasmine.createSpy('callback');
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
strategy: 'latest'
});
expect(telemetryProvider.subscribe).not.toHaveBeenCalled();
expect(unsubscribe).toEqual(jasmine.any(Function));
@ -111,12 +113,16 @@ describe('Telemetry API', () => {
const callback = jasmine.createSpy('callback');
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
strategy: 'latest'
});
expect(telemetryProvider.subscribe.calls.count()).toBe(1);
expect(telemetryProvider.subscribe).toHaveBeenCalledWith(
domainObject,
jasmine.any(Function),
undefined
{
strategy: 'latest'
}
);
const notify = telemetryProvider.subscribe.calls.mostRecent().args[1];
@ -321,6 +327,126 @@ describe('Telemetry API', () => {
signal
});
});
describe('telemetry batching support', () => {
let callbacks;
let unsubFunc;
beforeEach(() => {
callbacks = [];
unsubFunc = jasmine.createSpy('unsubscribe');
telemetryProvider.supportsBatching = jasmine.createSpy('supportsBatching');
telemetryProvider.supportsBatching.and.returnValue(true);
telemetryProvider.supportsSubscribe.and.returnValue(true);
telemetryProvider.subscribe.and.callFake(function (obj, cb, options) {
callbacks.push(cb);
return unsubFunc;
});
telemetryAPI.addProvider(telemetryProvider);
});
it('caches subscriptions for batched and latest telemetry subscriptions', () => {
const latestCallback1 = jasmine.createSpy('latestCallback1');
const unsubscribeFromLatest1 = telemetryAPI.subscribe(domainObject, latestCallback1, {
strategy: 'latest'
});
const latestCallback2 = jasmine.createSpy('latestCallback2');
const unsubscribeFromLatest2 = telemetryAPI.subscribe(domainObject, latestCallback2, {
strategy: 'latest'
});
//Expect a single cached subscription for latest telemetry
expect(telemetryProvider.subscribe.calls.count()).toBe(1);
const batchedCallback1 = jasmine.createSpy('batchedCallback1');
const unsubscribeFromBatched1 = telemetryAPI.subscribe(domainObject, batchedCallback1, {
strategy: 'batch'
});
const batchedCallback2 = jasmine.createSpy('batchedCallback2');
const unsubscribeFromBatched2 = telemetryAPI.subscribe(domainObject, batchedCallback2, {
strategy: 'batch'
});
//Expect a single cached subscription for each strategy telemetry
expect(telemetryProvider.subscribe.calls.count()).toBe(2);
unsubscribeFromLatest1();
unsubscribeFromLatest2();
unsubscribeFromBatched1();
unsubscribeFromBatched2();
expect(unsubFunc).toHaveBeenCalledTimes(2);
});
it('subscriptions with the latest strategy are always invoked with a single value', () => {
const latestCallback = jasmine.createSpy('latestCallback1');
telemetryAPI.subscribe(domainObject, latestCallback, {
strategy: 'latest'
});
const batchedValues = [1, 2, 3];
callbacks.forEach((cb) => {
cb(batchedValues);
});
expect(latestCallback).toHaveBeenCalledWith(3);
const singleValue = 1;
callbacks.forEach((cb) => {
cb(singleValue);
});
expect(latestCallback).toHaveBeenCalledWith(1);
});
it('subscriptions with the batch strategy are always invoked with an array', () => {
const batchedCallback = jasmine.createSpy('batchedCallback1');
const latestCallback = jasmine.createSpy('latestCallback1');
telemetryAPI.subscribe(domainObject, batchedCallback, {
strategy: 'batch'
});
telemetryAPI.subscribe(domainObject, latestCallback, {
strategy: 'latest'
});
const batchedValues = [1, 2, 3];
callbacks.forEach((cb) => {
cb(batchedValues);
});
// Callbacks for the 'batch' strategy are always called with an array of values
expect(batchedCallback).toHaveBeenCalledWith(batchedValues);
// Callbacks for the 'latest' strategy are always called with a single value
expect(latestCallback).toHaveBeenCalledWith(3);
callbacks.forEach((cb) => {
cb(1);
});
// Callbacks for the 'batch' strategy are always called with an array of values, even if there is only one value
expect(batchedCallback).toHaveBeenCalledWith([1]);
// Callbacks for the 'latest' strategy are always called with a single value
expect(latestCallback).toHaveBeenCalledWith(1);
});
it('legacy providers are left unchanged, with a single subscription', () => {
delete telemetryProvider.supportsBatching;
const batchCallback = jasmine.createSpy('batchCallback');
telemetryAPI.subscribe(domainObject, batchCallback, {
strategy: 'batch'
});
expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest');
const latestCallback = jasmine.createSpy('latestCallback');
telemetryAPI.subscribe(domainObject, latestCallback, {
strategy: 'latest'
});
expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest');
});
});
});
describe('metadata', () => {

View File

@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter {
if (this.unsubscribe) {
this.unsubscribe();
}
const options = { ...this.options };
//We always want to receive all available values in telemetry tables.
options.strategy = this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH;
this.unsubscribe = this.openmct.telemetry.subscribe(
this.domainObject,
(datum) => this._processNewTelemetry(datum),
this.options
options
);
}

View File

@ -0,0 +1,366 @@
/*****************************************************************************
* Open MCT Web, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT Web includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
/* eslint-disable max-classes-per-file */
export default function installWorker() {
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
/**
* @typedef {import('./BatchingWebSocket').BatchingStrategy} BatchingStrategy
*/
/**
* Provides a WebSocket connection that is resilient to errors and dropouts.
* On an error or dropout, will automatically reconnect.
*
* Additionally, messages will be queued and sent only when WebSocket is
* connected meaning that client code does not need to check the state of
* the socket before sending.
*/
class ResilientWebSocket extends EventTarget {
#webSocket;
#isConnected = false;
#isConnecting = false;
#messageQueue = [];
#reconnectTimeoutHandle;
#currentWaitIndex = 0;
#messageCallbacks = [];
#wsUrl;
/**
* Establish a new WebSocket connection to the given URL
* @param {String} url
*/
connect(url) {
this.#wsUrl = url;
if (this.#isConnected) {
throw new Error('WebSocket already connected');
}
if (this.#isConnecting) {
throw new Error('WebSocket connection in progress');
}
this.#isConnecting = true;
this.#webSocket = new WebSocket(url);
const boundConnected = this.#connected.bind(this);
this.#webSocket.addEventListener('open', boundConnected);
const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this);
this.#webSocket.addEventListener('error', boundCleanUpAndReconnect);
this.#webSocket.addEventListener('close', boundCleanUpAndReconnect);
const boundMessage = this.#message.bind(this);
this.#webSocket.addEventListener('message', boundMessage);
this.addEventListener(
'disconnected',
() => {
this.#webSocket.removeEventListener('open', boundConnected);
this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect);
this.#webSocket.removeEventListener('close', boundCleanUpAndReconnect);
},
{ once: true }
);
}
/**
* Register a callback to be invoked when a message is received on the WebSocket.
* This paradigm is used instead of the standard EventTarget or EventEmitter approach
* for performance reasons.
* @param {Function} callback The function to be invoked when a message is received
* @returns an unregister function
*/
registerMessageCallback(callback) {
this.#messageCallbacks.push(callback);
return () => {
this.#messageCallbacks = this.#messageCallbacks.filter((cb) => cb !== callback);
};
}
#connected() {
console.debug('Websocket connected.');
this.#isConnected = true;
this.#isConnecting = false;
this.#currentWaitIndex = 0;
this.dispatchEvent(new Event('connected'));
this.#flushQueue();
}
#cleanUpAndReconnect() {
console.warn('Websocket closed. Attempting to reconnect...');
this.disconnect();
this.#reconnect();
}
#message(event) {
this.#messageCallbacks.forEach((callback) => callback(event.data));
}
disconnect() {
this.#isConnected = false;
this.#isConnecting = false;
// On WebSocket error, both error callback and close callback are invoked, resulting in
// this function being called twice, and websocket being destroyed and deallocated.
if (this.#webSocket !== undefined && this.#webSocket !== null) {
this.#webSocket.close();
}
this.dispatchEvent(new Event('disconnected'));
this.#webSocket = undefined;
}
#reconnect() {
if (this.#reconnectTimeoutHandle) {
return;
}
this.#reconnectTimeoutHandle = setTimeout(() => {
this.connect(this.#wsUrl);
this.#reconnectTimeoutHandle = undefined;
}, FALLBACK_AND_WAIT_MS[this.#currentWaitIndex]);
if (this.#currentWaitIndex < FALLBACK_AND_WAIT_MS.length - 1) {
this.#currentWaitIndex++;
}
}
enqueueMessage(message) {
this.#messageQueue.push(message);
this.#flushQueueIfReady();
}
#flushQueueIfReady() {
if (this.#isConnected) {
this.#flushQueue();
}
}
#flushQueue() {
while (this.#messageQueue.length > 0) {
if (!this.#isConnected) {
break;
}
const message = this.#messageQueue.shift();
this.#webSocket.send(message);
}
}
}
/**
* Handles messages over the worker interface, and
* sends corresponding WebSocket messages.
*/
class WorkerToWebSocketMessageBroker {
#websocket;
#messageBatcher;
constructor(websocket, messageBatcher) {
this.#websocket = websocket;
this.#messageBatcher = messageBatcher;
}
routeMessageToHandler(message) {
const { type } = message.data;
switch (type) {
case 'connect':
this.connect(message);
break;
case 'disconnect':
this.disconnect(message);
break;
case 'message':
this.#websocket.enqueueMessage(message.data.message);
break;
case 'setBatchingStrategy':
this.setBatchingStrategy(message);
break;
case 'readyForNextBatch':
this.#messageBatcher.readyForNextBatch();
break;
case 'setMaxBatchSize':
this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize);
break;
default:
throw new Error(`Unknown message type: ${type}`);
}
}
connect(message) {
const { url } = message.data;
this.#websocket.connect(url);
}
disconnect() {
this.#websocket.disconnect();
}
setBatchingStrategy(message) {
const { serializedStrategy } = message.data;
const batchingStrategy = {
// eslint-disable-next-line no-new-func
shouldBatchMessage: new Function(`return ${serializedStrategy.shouldBatchMessage}`)(),
// eslint-disable-next-line no-new-func
getBatchIdFromMessage: new Function(`return ${serializedStrategy.getBatchIdFromMessage}`)()
// Will also include maximum batch length here
};
this.#messageBatcher.setBatchingStrategy(batchingStrategy);
}
}
/**
* Received messages from the WebSocket, and passes them along to the
* Worker interface and back to the main thread.
*/
class WebSocketToWorkerMessageBroker {
#worker;
#messageBatcher;
constructor(messageBatcher, worker) {
this.#messageBatcher = messageBatcher;
this.#worker = worker;
}
routeMessageToHandler(data) {
//Implement batching here
if (this.#messageBatcher.shouldBatchMessage(data)) {
this.#messageBatcher.addMessageToBatch(data);
} else {
this.#worker.postMessage({
type: 'message',
message: data
});
}
}
}
/**
* Responsible for batching messages according to the defined batching strategy.
*/
class MessageBatcher {
#batch;
#batchingStrategy;
#hasBatch = false;
#maxBatchSize;
#readyForNextBatch;
#worker;
constructor(worker) {
this.#maxBatchSize = 10;
this.#readyForNextBatch = false;
this.#worker = worker;
this.#resetBatch();
}
#resetBatch() {
this.#batch = {};
this.#hasBatch = false;
}
/**
* @param {BatchingStrategy} strategy
*/
setBatchingStrategy(strategy) {
this.#batchingStrategy = strategy;
}
/**
* Applies the `shouldBatchMessage` function from the supplied batching strategy
* to each message to determine if it should be added to a batch. If not batched,
* the message is immediately sent over the worker to the main thread.
* @param {any} message the message received from the WebSocket. See the WebSocket
* documentation for more details -
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
* @returns
*/
shouldBatchMessage(message) {
return (
this.#batchingStrategy.shouldBatchMessage &&
this.#batchingStrategy.shouldBatchMessage(message)
);
}
/**
* Adds the given message to a batch. The batch group that the message is added
* to will be determined by the value returned by `getBatchIdFromMessage`.
* @param {any} message the message received from the WebSocket. See the WebSocket
* documentation for more details -
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
*/
addMessageToBatch(message) {
const batchId = this.#batchingStrategy.getBatchIdFromMessage(message);
let batch = this.#batch[batchId];
if (batch === undefined) {
batch = this.#batch[batchId] = [message];
} else {
batch.push(message);
}
if (batch.length > this.#maxBatchSize) {
batch.shift();
this.#batch.dropped = this.#batch.dropped || true;
}
if (this.#readyForNextBatch) {
this.#sendNextBatch();
} else {
this.#hasBatch = true;
}
}
setMaxBatchSize(maxBatchSize) {
this.#maxBatchSize = maxBatchSize;
}
/**
* Indicates that client code is ready to receive the next batch of
* messages. If a batch is available, it will be immediately sent.
* Otherwise a flag will be set to send the next batch as soon as
* any new data is available.
*/
readyForNextBatch() {
if (this.#hasBatch) {
this.#sendNextBatch();
} else {
this.#readyForNextBatch = true;
}
}
#sendNextBatch() {
const batch = this.#batch;
this.#resetBatch();
this.#worker.postMessage({
type: 'batch',
batch
});
this.#readyForNextBatch = false;
this.#hasBatch = false;
}
}
const websocket = new ResilientWebSocket();
const messageBatcher = new MessageBatcher(self);
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher);
const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self);
self.addEventListener('message', (message) => {
workerBroker.routeMessageToHandler(message);
});
websocket.registerMessageCallback((data) => {
websocketBroker.routeMessageToHandler(data);
});
}

View File

@ -139,7 +139,9 @@ export default class Model extends EventEmitter {
/** @typedef {any} TODO */
/** @typedef {TODO} OpenMCT */
/**
* @typedef {import('../../../../openmct.js').OpenMCT} OpenMCT
*/
/**
@template {object} T

View File

@ -211,9 +211,16 @@ export default class PlotSeries extends Model {
);
if (!this.unsubscribe) {
this.unsubscribe = this.openmct.telemetry.subscribe(this.domainObject, this.add.bind(this), {
filters: this.filters
});
this.unsubscribe = this.openmct.telemetry.subscribe(
this.domainObject,
(data) => {
this.addAll(data, true);
},
{
filters: this.filters,
strategy: this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH
}
);
}
try {
@ -302,9 +309,7 @@ export default class PlotSeries extends Model {
this.resetStats();
this.emit('reset');
if (newData) {
newData.forEach(function (point) {
this.add(point, true);
}, this);
this.addAll(newData, true);
}
}
/**
@ -416,14 +421,14 @@ export default class PlotSeries extends Model {
* when adding an array of points that are already properly sorted.
*
* @private
* @param {Object} point a telemetry datum.
* @param {Boolean} [appendOnly] default false, if true will append
* @param {Object} newData a telemetry datum.
* @param {Boolean} [sorted] default false, if true will append
* a point to the end without dupe checking.
*/
add(point, appendOnly) {
add(newData, sorted = false) {
let data = this.getSeriesData();
let insertIndex = data.length;
const currentYVal = this.getYVal(point);
const currentYVal = this.getYVal(newData);
const lastYVal = this.getYVal(data[insertIndex - 1]);
if (this.isValueInvalid(currentYVal) && this.isValueInvalid(lastYVal)) {
@ -432,22 +437,28 @@ export default class PlotSeries extends Model {
return;
}
if (!appendOnly) {
insertIndex = this.sortedIndex(point);
if (this.getXVal(data[insertIndex]) === this.getXVal(point)) {
if (!sorted) {
insertIndex = this.sortedIndex(newData);
if (this.getXVal(data[insertIndex]) === this.getXVal(newData)) {
return;
}
if (this.getXVal(data[insertIndex - 1]) === this.getXVal(point)) {
if (this.getXVal(data[insertIndex - 1]) === this.getXVal(newData)) {
return;
}
}
this.updateStats(point);
point.mctLimitState = this.evaluate(point);
data.splice(insertIndex, 0, point);
this.updateStats(newData);
newData.mctLimitState = this.evaluate(newData);
data.splice(insertIndex, 0, newData);
this.updateSeriesData(data);
this.emit('add', point, insertIndex, this);
this.emit('add', newData, insertIndex, this);
}
addAll(points, sorted = false) {
for (let i = 0; i < points.length; i++) {
this.add(points[i], sorted);
}
}
/**

View File

@ -59,26 +59,21 @@ export default class RemoteClock extends DefaultClock {
}
start() {
this.openmct.objects
.get(this.identifier)
.then((domainObject) => {
// The start method is called when at least one listener registers with the clock.
// When the clock is changed, listeners are unregistered from the clock and the stop method is called.
// Sometimes, the objects.get call above does not resolve before the stop method is called.
// So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock.
if (this.eventNames().length === 0) {
return;
}
this.openmct.time.on('timeSystem', this._timeSystemChange);
this.timeTelemetryObject = domainObject;
this.metadata = this.openmct.telemetry.getMetadata(domainObject);
this._timeSystemChange();
this._requestLatest();
this._subscribe();
})
.catch((error) => {
throw new Error(error);
});
this.openmct.objects.get(this.identifier).then((domainObject) => {
// The start method is called when at least one listener registers with the clock.
// When the clock is changed, listeners are unregistered from the clock and the stop method is called.
// Sometimes, the objects.get call above does not resolve before the stop method is called.
// So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock.
if (this.eventNames().length === 0) {
return;
}
this.openmct.time.on('timeSystem', this._timeSystemChange);
this.timeTelemetryObject = domainObject;
this.metadata = this.openmct.telemetry.getMetadata(domainObject);
this._timeSystemChange();
this._requestLatest();
this._subscribe();
});
}
stop() {