mirror of
https://github.com/nasa/openmct.git
synced 2025-06-30 04:33:03 +00:00
Compare commits
19 Commits
fix-exampl
...
subscripti
Author | SHA1 | Date | |
---|---|---|---|
4cc0a9e402 | |||
12b921a006 | |||
34b36d544a | |||
e135498401 | |||
c4a3ace027 | |||
ad02ba7ffe | |||
0bdd5efcba | |||
2e3dc4da9a | |||
61edd0f810 | |||
4c86b66624 | |||
f079c3a3b9 | |||
947810b5d7 | |||
c28ced5c29 | |||
e530fd8e8b | |||
74c1cdf468 | |||
0061d162e1 | |||
2f2af0bac5 | |||
a87ffee264 | |||
69b2f05de2 |
194
src/api/telemetry/BatchingWebSocket.js
Normal file
194
src/api/telemetry/BatchingWebSocket.js
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
/*****************************************************************************
|
||||||
|
* Open MCT Web, Copyright (c) 2014-2024, United States Government
|
||||||
|
* as represented by the Administrator of the National Aeronautics and Space
|
||||||
|
* Administration. All rights reserved.
|
||||||
|
*
|
||||||
|
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0.
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
* Open MCT Web includes source code licensed under additional open source
|
||||||
|
* licenses. See the Open Source Licenses file (LICENSES.md) included with
|
||||||
|
* this source code distribution or the Licensing information page available
|
||||||
|
* at runtime from the About dialog for additional information.
|
||||||
|
*****************************************************************************/
|
||||||
|
import installWorker from './WebSocketWorker.js';
|
||||||
|
const DEFAULT_RATE_MS = 1000;
|
||||||
|
/**
|
||||||
|
* Describes the strategy to be used when batching WebSocket messages
|
||||||
|
*
|
||||||
|
* @typedef BatchingStrategy
|
||||||
|
* @property {Function} shouldBatchMessage a function that accepts a single
|
||||||
|
* argument - the raw message received from the websocket. Every message
|
||||||
|
* received will be evaluated against this function so it should be performant.
|
||||||
|
* Note also that this function is executed in a worker, so it must be
|
||||||
|
* completely self-contained with no external dependencies. The function
|
||||||
|
* should return `true` if the message should be batched, and `false` if not.
|
||||||
|
* @property {Function} getBatchIdFromMessage a function that accepts a
|
||||||
|
* single argument - the raw message received from the websocket. Only messages
|
||||||
|
* where `shouldBatchMessage` has evaluated to true will be passed into this
|
||||||
|
* function. The function should return a unique value on which to batch the
|
||||||
|
* messages. For example a telemetry, channel, or parameter identifier.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Provides a reliable and convenient WebSocket abstraction layer that handles
|
||||||
|
* a lot of boilerplate common to managing WebSocket connections such as:
|
||||||
|
* - Establishing a WebSocket connection to a server
|
||||||
|
* - Reconnecting on error, with a fallback strategy
|
||||||
|
* - Queuing messages so that clients can send messages without concern for the current
|
||||||
|
* connection state of the WebSocket.
|
||||||
|
*
|
||||||
|
* The WebSocket that it manages is based in a dedicated worker so that network
|
||||||
|
* concerns are not handled on the main event loop. This allows for performant receipt
|
||||||
|
* and batching of messages without blocking either the UI or server.
|
||||||
|
*
|
||||||
|
* @memberof module:openmct.telemetry
|
||||||
|
*/
|
||||||
|
class BatchingWebSocket extends EventTarget {
|
||||||
|
#worker;
|
||||||
|
#openmct;
|
||||||
|
#showingRateLimitNotification;
|
||||||
|
#rate;
|
||||||
|
|
||||||
|
constructor(openmct) {
|
||||||
|
super();
|
||||||
|
// Install worker, register listeners etc.
|
||||||
|
const workerFunction = `(${installWorker.toString()})()`;
|
||||||
|
const workerBlob = new Blob([workerFunction]);
|
||||||
|
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
|
||||||
|
this.#worker = new Worker(workerUrl);
|
||||||
|
this.#openmct = openmct;
|
||||||
|
this.#showingRateLimitNotification = false;
|
||||||
|
this.#rate = DEFAULT_RATE_MS;
|
||||||
|
|
||||||
|
const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
|
||||||
|
this.#worker.addEventListener('message', routeMessageToHandler);
|
||||||
|
openmct.on(
|
||||||
|
'destroy',
|
||||||
|
() => {
|
||||||
|
this.disconnect();
|
||||||
|
URL.revokeObjectURL(workerUrl);
|
||||||
|
},
|
||||||
|
{ once: true }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Will establish a WebSocket connection to the provided url
|
||||||
|
* @param {string} url The URL to connect to
|
||||||
|
*/
|
||||||
|
connect(url) {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'connect',
|
||||||
|
url
|
||||||
|
});
|
||||||
|
|
||||||
|
this.#readyForNextBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
#readyForNextBatch() {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'readyForNextBatch'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a message to the WebSocket.
|
||||||
|
* @param {any} message The message to send. Can be any type supported by WebSockets.
|
||||||
|
* See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data
|
||||||
|
*/
|
||||||
|
sendMessage(message) {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'message',
|
||||||
|
message
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the strategy used to both decide which raw messages to batch, and how to group
|
||||||
|
* them.
|
||||||
|
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
|
||||||
|
* raw messages from the WebSocket.
|
||||||
|
*/
|
||||||
|
setBatchingStrategy(strategy) {
|
||||||
|
const serializedStrategy = {
|
||||||
|
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
|
||||||
|
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
|
||||||
|
};
|
||||||
|
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'setBatchingStrategy',
|
||||||
|
serializedStrategy
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When using batching, sets the rate at which batches of messages are released.
|
||||||
|
* @param {Number} rate the amount of time to wait, in ms, between batches.
|
||||||
|
*/
|
||||||
|
setRate(rate) {
|
||||||
|
this.#rate = rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Number} maxBatchSize the maximum length of a batch of messages. For example,
|
||||||
|
* the maximum number of telemetry values to batch before dropping them
|
||||||
|
* Note that this is a fail-safe that is only invoked if performance drops to the
|
||||||
|
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
|
||||||
|
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
|
||||||
|
* most recent telemetry. The user will be informed that telemetry has been dropped.
|
||||||
|
*
|
||||||
|
* This should be specced appropriately for the expected data rate. eg. If telemetry
|
||||||
|
* is received at 10Hz for each telemetry point, then a minimal combination of batch
|
||||||
|
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
|
||||||
|
* 15 would probably be a better batch size.
|
||||||
|
*/
|
||||||
|
setMaxBatchSize(maxBatchSize) {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'setMaxBatchSize',
|
||||||
|
maxBatchSize
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disconnect the associated WebSocket. Generally speaking there is no need to call
|
||||||
|
* this manually.
|
||||||
|
*/
|
||||||
|
disconnect() {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'disconnect'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#routeMessageToHandler(message) {
|
||||||
|
if (message.data.type === 'batch') {
|
||||||
|
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
|
||||||
|
const notification = this.#openmct.notifications.alert(
|
||||||
|
'Telemetry dropped due to client rate limiting.',
|
||||||
|
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
|
||||||
|
);
|
||||||
|
this.#showingRateLimitNotification = true;
|
||||||
|
notification.once('minimized', () => {
|
||||||
|
this.#showingRateLimitNotification = false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
|
||||||
|
setTimeout(() => {
|
||||||
|
this.#readyForNextBatch();
|
||||||
|
}, this.#rate);
|
||||||
|
} else if (message.data.type === 'message') {
|
||||||
|
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));
|
||||||
|
} else {
|
||||||
|
throw new Error(`Unknown message type: ${message.data.type}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default BatchingWebSocket;
|
@ -23,6 +23,7 @@
|
|||||||
import objectUtils from 'objectUtils';
|
import objectUtils from 'objectUtils';
|
||||||
|
|
||||||
import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
|
import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
|
||||||
|
import BatchingWebSocket from './BatchingWebSocket.js';
|
||||||
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
|
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
|
||||||
import TelemetryCollection from './TelemetryCollection.js';
|
import TelemetryCollection from './TelemetryCollection.js';
|
||||||
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
|
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
|
||||||
@ -54,6 +55,28 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js';
|
|||||||
* @memberof module:openmct.TelemetryAPI~
|
* @memberof module:openmct.TelemetryAPI~
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Describes and bounds requests for telemetry data.
|
||||||
|
*
|
||||||
|
* @typedef TelemetrySubscriptionOptions
|
||||||
|
* @property {String} [strategy] symbolic identifier directing providers on how
|
||||||
|
* to handle telemetry subscriptions. The default behavior is 'latest' which will
|
||||||
|
* always return a single telemetry value with each callback, and in the event
|
||||||
|
* of throttling will always prioritize the latest data, meaning intermediate
|
||||||
|
* data will be skipped. Alternatively, the `batch` strategy can be used, which
|
||||||
|
* will return all telemetry values since the last callback. This strategy is
|
||||||
|
* useful for cases where intermediate data is important, such as when
|
||||||
|
* rendering a telemetry plot or table. If `batch` is specified, the subscription
|
||||||
|
* callback will be invoked with an Array.
|
||||||
|
*
|
||||||
|
* @memberof module:openmct.TelemetryAPI~
|
||||||
|
*/
|
||||||
|
|
||||||
|
const SUBSCRIBE_STRATEGY = {
|
||||||
|
LATEST: 'latest',
|
||||||
|
BATCH: 'batch'
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for telemetry
|
* Utilities for telemetry
|
||||||
* @interface TelemetryAPI
|
* @interface TelemetryAPI
|
||||||
@ -62,6 +85,10 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js';
|
|||||||
export default class TelemetryAPI {
|
export default class TelemetryAPI {
|
||||||
#isGreedyLAD;
|
#isGreedyLAD;
|
||||||
|
|
||||||
|
get SUBSCRIBE_STRATEGY() {
|
||||||
|
return SUBSCRIBE_STRATEGY;
|
||||||
|
}
|
||||||
|
|
||||||
constructor(openmct) {
|
constructor(openmct) {
|
||||||
this.openmct = openmct;
|
this.openmct = openmct;
|
||||||
|
|
||||||
@ -78,6 +105,7 @@ export default class TelemetryAPI {
|
|||||||
this.valueFormatterCache = new WeakMap();
|
this.valueFormatterCache = new WeakMap();
|
||||||
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
|
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
|
||||||
this.#isGreedyLAD = true;
|
this.#isGreedyLAD = true;
|
||||||
|
this.BatchingWebSocket = BatchingWebSocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
abortAllRequests() {
|
abortAllRequests() {
|
||||||
@ -378,18 +406,23 @@ export default class TelemetryAPI {
|
|||||||
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
|
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
|
||||||
* @param {module:openmct.DomainObject} domainObject the object
|
* @param {module:openmct.DomainObject} domainObject the object
|
||||||
* which has associated telemetry
|
* which has associated telemetry
|
||||||
* @param {TelemetryRequestOptions} options configuration items for subscription
|
* @param {TelemetrySubscriptionOptions} options configuration items for subscription
|
||||||
* @param {Function} callback the callback to invoke with new data, as
|
* @param {Function} callback the callback to invoke with new data, as
|
||||||
* it becomes available
|
* it becomes available
|
||||||
* @returns {Function} a function which may be called to terminate
|
* @returns {Function} a function which may be called to terminate
|
||||||
* the subscription
|
* the subscription
|
||||||
*/
|
*/
|
||||||
subscribe(domainObject, callback, options) {
|
subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) {
|
||||||
if (domainObject.type === 'unknown') {
|
if (domainObject.type === 'unknown') {
|
||||||
return () => {};
|
return () => {};
|
||||||
}
|
}
|
||||||
|
|
||||||
const provider = this.findSubscriptionProvider(domainObject);
|
// Default behavior is to use the latest strategy, as opposed to the new "batch" strategy
|
||||||
|
if (options.strategy === undefined || options.strategy === null) {
|
||||||
|
options.strategy = SUBSCRIBE_STRATEGY.LATEST;
|
||||||
|
}
|
||||||
|
|
||||||
|
const provider = this.findSubscriptionProvider(domainObject, options);
|
||||||
|
|
||||||
if (!this.subscribeCache) {
|
if (!this.subscribeCache) {
|
||||||
this.subscribeCache = {};
|
this.subscribeCache = {};
|
||||||
@ -405,11 +438,9 @@ export default class TelemetryAPI {
|
|||||||
if (provider) {
|
if (provider) {
|
||||||
subscriber.unsubscribe = provider.subscribe(
|
subscriber.unsubscribe = provider.subscribe(
|
||||||
domainObject,
|
domainObject,
|
||||||
function (value) {
|
options.strategy === SUBSCRIBE_STRATEGY.BATCH
|
||||||
subscriber.callbacks.forEach(function (cb) {
|
? subscriptionCallbackForArray
|
||||||
cb(value);
|
: subscriptionCallbackForSingleValue,
|
||||||
});
|
|
||||||
},
|
|
||||||
options
|
options
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
@ -419,6 +450,38 @@ export default class TelemetryAPI {
|
|||||||
subscriber.callbacks.push(callback);
|
subscriber.callbacks.push(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function subscriptionCallbackForArray(value) {
|
||||||
|
if (value === undefined || value === null || value.length === 0) {
|
||||||
|
throw new Error(
|
||||||
|
'Attempt to invoke telemetry subscription callback with no telemetry datum'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!Array.isArray(value)) {
|
||||||
|
value = [value];
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.callbacks.forEach(function (cb) {
|
||||||
|
cb(value);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function subscriptionCallbackForSingleValue(value) {
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
value = value[value.length - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value === undefined || value === null) {
|
||||||
|
throw new Error(
|
||||||
|
'Attempt to invoke telemetry subscription callback with no telemetry datum'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.callbacks.forEach(function (cb) {
|
||||||
|
cb(value);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
return function unsubscribe() {
|
return function unsubscribe() {
|
||||||
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
|
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
|
||||||
return cb !== callback;
|
return cb !== callback;
|
||||||
|
@ -90,7 +90,9 @@ describe('Telemetry API', () => {
|
|||||||
|
|
||||||
const callback = jasmine.createSpy('callback');
|
const callback = jasmine.createSpy('callback');
|
||||||
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
|
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
|
||||||
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
|
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
|
||||||
|
strategy: 'latest'
|
||||||
|
});
|
||||||
expect(telemetryProvider.subscribe).not.toHaveBeenCalled();
|
expect(telemetryProvider.subscribe).not.toHaveBeenCalled();
|
||||||
expect(unsubscribe).toEqual(jasmine.any(Function));
|
expect(unsubscribe).toEqual(jasmine.any(Function));
|
||||||
|
|
||||||
@ -111,12 +113,16 @@ describe('Telemetry API', () => {
|
|||||||
const callback = jasmine.createSpy('callback');
|
const callback = jasmine.createSpy('callback');
|
||||||
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
|
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
|
||||||
expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1);
|
expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1);
|
||||||
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
|
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
|
||||||
|
strategy: 'latest'
|
||||||
|
});
|
||||||
expect(telemetryProvider.subscribe.calls.count()).toBe(1);
|
expect(telemetryProvider.subscribe.calls.count()).toBe(1);
|
||||||
expect(telemetryProvider.subscribe).toHaveBeenCalledWith(
|
expect(telemetryProvider.subscribe).toHaveBeenCalledWith(
|
||||||
domainObject,
|
domainObject,
|
||||||
jasmine.any(Function),
|
jasmine.any(Function),
|
||||||
undefined
|
{
|
||||||
|
strategy: 'latest'
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
const notify = telemetryProvider.subscribe.calls.mostRecent().args[1];
|
const notify = telemetryProvider.subscribe.calls.mostRecent().args[1];
|
||||||
|
@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter {
|
|||||||
if (this.unsubscribe) {
|
if (this.unsubscribe) {
|
||||||
this.unsubscribe();
|
this.unsubscribe();
|
||||||
}
|
}
|
||||||
|
const options = { ...this.options };
|
||||||
|
//We always want to receive all available values in telemetry tables.
|
||||||
|
options.strategy = this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH;
|
||||||
|
|
||||||
this.unsubscribe = this.openmct.telemetry.subscribe(
|
this.unsubscribe = this.openmct.telemetry.subscribe(
|
||||||
this.domainObject,
|
this.domainObject,
|
||||||
(datum) => this._processNewTelemetry(datum),
|
(datum) => this._processNewTelemetry(datum),
|
||||||
this.options
|
options
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
383
src/api/telemetry/WebSocketWorker.js
Normal file
383
src/api/telemetry/WebSocketWorker.js
Normal file
@ -0,0 +1,383 @@
|
|||||||
|
/*****************************************************************************
|
||||||
|
* Open MCT Web, Copyright (c) 2014-2024, United States Government
|
||||||
|
* as represented by the Administrator of the National Aeronautics and Space
|
||||||
|
* Administration. All rights reserved.
|
||||||
|
*
|
||||||
|
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0.
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
* Open MCT Web includes source code licensed under additional open source
|
||||||
|
* licenses. See the Open Source Licenses file (LICENSES.md) included with
|
||||||
|
* this source code distribution or the Licensing information page available
|
||||||
|
* at runtime from the About dialog for additional information.
|
||||||
|
*****************************************************************************/
|
||||||
|
/* eslint-disable max-classes-per-file */
|
||||||
|
export default function installWorker() {
|
||||||
|
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {import('./BatchingWebSocket').BatchingStrategy} BatchingStrategy
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a WebSocket connection that is resilient to errors and dropouts.
|
||||||
|
* On an error or dropout, will automatically reconnect.
|
||||||
|
*
|
||||||
|
* Additionally, messages will be queued and sent only when WebSocket is
|
||||||
|
* connected meaning that client code does not need to check the state of
|
||||||
|
* the socket before sending.
|
||||||
|
*/
|
||||||
|
class ResilientWebSocket extends EventTarget {
|
||||||
|
#webSocket;
|
||||||
|
#isConnected = false;
|
||||||
|
#isConnecting = false;
|
||||||
|
#messageQueue = [];
|
||||||
|
#reconnectTimeoutHandle;
|
||||||
|
#currentWaitIndex = 0;
|
||||||
|
#messageCallbacks = [];
|
||||||
|
#wsUrl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Establish a new WebSocket connection to the given URL
|
||||||
|
* @param {String} url
|
||||||
|
*/
|
||||||
|
connect(url) {
|
||||||
|
this.#wsUrl = url;
|
||||||
|
if (this.#isConnected) {
|
||||||
|
throw new Error('WebSocket already connected');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#isConnecting) {
|
||||||
|
throw new Error('WebSocket connection in progress');
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#isConnecting = true;
|
||||||
|
|
||||||
|
//TODO: Make this configurable
|
||||||
|
this.#webSocket = new WebSocket(url, 'protobuf');
|
||||||
|
//TODO: Make this configurable
|
||||||
|
this.#webSocket.binaryType = 'arraybuffer';
|
||||||
|
|
||||||
|
const boundConnected = this.#connected.bind(this);
|
||||||
|
this.#webSocket.addEventListener('open', boundConnected);
|
||||||
|
|
||||||
|
const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this);
|
||||||
|
this.#webSocket.addEventListener('error', boundCleanUpAndReconnect);
|
||||||
|
this.#webSocket.addEventListener('close', boundCleanUpAndReconnect);
|
||||||
|
|
||||||
|
const boundMessage = this.#message.bind(this);
|
||||||
|
this.#webSocket.addEventListener('message', boundMessage);
|
||||||
|
|
||||||
|
this.addEventListener(
|
||||||
|
'disconnected',
|
||||||
|
() => {
|
||||||
|
this.#webSocket.removeEventListener('open', boundConnected);
|
||||||
|
this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect);
|
||||||
|
this.#webSocket.removeEventListener('close', boundCleanUpAndReconnect);
|
||||||
|
},
|
||||||
|
{ once: true }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a callback to be invoked when a message is received on the WebSocket.
|
||||||
|
* This paradigm is used instead of the standard EventTarget or EventEmitter approach
|
||||||
|
* for performance reasons.
|
||||||
|
* @param {Function} callback The function to be invoked when a message is received
|
||||||
|
* @returns an unregister function
|
||||||
|
*/
|
||||||
|
registerMessageCallback(callback) {
|
||||||
|
this.#messageCallbacks.push(callback);
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
this.#messageCallbacks = this.#messageCallbacks.filter((cb) => cb !== callback);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#connected() {
|
||||||
|
console.debug('Websocket connected.');
|
||||||
|
this.#isConnected = true;
|
||||||
|
this.#isConnecting = false;
|
||||||
|
this.#currentWaitIndex = 0;
|
||||||
|
|
||||||
|
this.dispatchEvent(new Event('connected'));
|
||||||
|
|
||||||
|
this.#flushQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
#cleanUpAndReconnect() {
|
||||||
|
console.warn('Websocket closed. Attempting to reconnect...');
|
||||||
|
this.disconnect();
|
||||||
|
this.#reconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
#message(event) {
|
||||||
|
this.#messageCallbacks.forEach((callback) => callback(event.data));
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect() {
|
||||||
|
this.#isConnected = false;
|
||||||
|
this.#isConnecting = false;
|
||||||
|
|
||||||
|
// On WebSocket error, both error callback and close callback are invoked, resulting in
|
||||||
|
// this function being called twice, and websocket being destroyed and deallocated.
|
||||||
|
if (this.#webSocket !== undefined && this.#webSocket !== null) {
|
||||||
|
this.#webSocket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.dispatchEvent(new Event('disconnected'));
|
||||||
|
this.#webSocket = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
#reconnect() {
|
||||||
|
if (this.#reconnectTimeoutHandle) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#reconnectTimeoutHandle = setTimeout(() => {
|
||||||
|
this.connect(this.#wsUrl);
|
||||||
|
|
||||||
|
this.#reconnectTimeoutHandle = undefined;
|
||||||
|
}, FALLBACK_AND_WAIT_MS[this.#currentWaitIndex]);
|
||||||
|
|
||||||
|
if (this.#currentWaitIndex < FALLBACK_AND_WAIT_MS.length - 1) {
|
||||||
|
this.#currentWaitIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueueMessage(message) {
|
||||||
|
this.#messageQueue.push(message);
|
||||||
|
this.#flushQueueIfReady();
|
||||||
|
}
|
||||||
|
|
||||||
|
#flushQueueIfReady() {
|
||||||
|
if (this.#isConnected) {
|
||||||
|
this.#flushQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#flushQueue() {
|
||||||
|
while (this.#messageQueue.length > 0) {
|
||||||
|
if (!this.#isConnected) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = this.#messageQueue.shift();
|
||||||
|
this.#webSocket.send(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles messages over the worker interface, and
|
||||||
|
* sends corresponding WebSocket messages.
|
||||||
|
*/
|
||||||
|
class WorkerToWebSocketMessageBroker {
|
||||||
|
#websocket;
|
||||||
|
#messageBatcher;
|
||||||
|
|
||||||
|
constructor(websocket, messageBatcher) {
|
||||||
|
this.#websocket = websocket;
|
||||||
|
this.#messageBatcher = messageBatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
routeMessageToHandler(message) {
|
||||||
|
const { type } = message.data;
|
||||||
|
switch (type) {
|
||||||
|
case 'connect':
|
||||||
|
this.connect(message);
|
||||||
|
break;
|
||||||
|
case 'disconnect':
|
||||||
|
this.disconnect(message);
|
||||||
|
break;
|
||||||
|
case 'message':
|
||||||
|
this.#websocket.enqueueMessage(message.data.message);
|
||||||
|
break;
|
||||||
|
case 'setBatchingStrategy':
|
||||||
|
this.setBatchingStrategy(message);
|
||||||
|
break;
|
||||||
|
case 'readyForNextBatch':
|
||||||
|
this.#messageBatcher.readyForNextBatch();
|
||||||
|
break;
|
||||||
|
case 'setMaxBatchSize':
|
||||||
|
this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error(`Unknown message type: ${type}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connect(message) {
|
||||||
|
const { url } = message.data;
|
||||||
|
this.#websocket.connect(url);
|
||||||
|
}
|
||||||
|
disconnect() {
|
||||||
|
this.#websocket.disconnect();
|
||||||
|
}
|
||||||
|
setBatchingStrategy(message) {
|
||||||
|
const { serializedStrategy } = message.data;
|
||||||
|
const batchingStrategy = {
|
||||||
|
// eslint-disable-next-line no-new-func
|
||||||
|
shouldBatchMessage: new Function(`return ${serializedStrategy.shouldBatchMessage}`)(),
|
||||||
|
// eslint-disable-next-line no-new-func
|
||||||
|
getBatchIdFromMessage: new Function(`return ${serializedStrategy.getBatchIdFromMessage}`)()
|
||||||
|
// Will also include maximum batch length here
|
||||||
|
};
|
||||||
|
this.#messageBatcher.setBatchingStrategy(batchingStrategy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Received messages from the WebSocket, and passes them along to the
|
||||||
|
* Worker interface and back to the main thread.
|
||||||
|
*/
|
||||||
|
class WebSocketToWorkerMessageBroker {
|
||||||
|
#worker;
|
||||||
|
#messageBatcher;
|
||||||
|
|
||||||
|
constructor(messageBatcher, worker) {
|
||||||
|
this.#messageBatcher = messageBatcher;
|
||||||
|
this.#worker = worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
routeMessageToHandler(data) {
|
||||||
|
//Implement batching here
|
||||||
|
if (this.#messageBatcher.shouldBatchMessage(data)) {
|
||||||
|
this.#messageBatcher.addMessageToBatch(data);
|
||||||
|
} else {
|
||||||
|
this.#worker.postMessage(
|
||||||
|
{
|
||||||
|
type: 'message',
|
||||||
|
message: data
|
||||||
|
},
|
||||||
|
{
|
||||||
|
transfer: data
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for batching messages according to the defined batching strategy.
|
||||||
|
*/
|
||||||
|
class MessageBatcher {
|
||||||
|
#batch;
|
||||||
|
#batchingStrategy;
|
||||||
|
#hasBatch = false;
|
||||||
|
#maxBatchSize;
|
||||||
|
#readyForNextBatch;
|
||||||
|
#worker;
|
||||||
|
#transferables;
|
||||||
|
|
||||||
|
constructor(worker) {
|
||||||
|
this.#maxBatchSize = 10;
|
||||||
|
this.#readyForNextBatch = false;
|
||||||
|
this.#worker = worker;
|
||||||
|
this.#transferables = [];
|
||||||
|
this.#resetBatch();
|
||||||
|
}
|
||||||
|
#resetBatch() {
|
||||||
|
this.#batch = {};
|
||||||
|
this.#hasBatch = false;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param {BatchingStrategy} strategy
|
||||||
|
*/
|
||||||
|
setBatchingStrategy(strategy) {
|
||||||
|
this.#batchingStrategy = strategy;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Applies the `shouldBatchMessage` function from the supplied batching strategy
|
||||||
|
* to each message to determine if it should be added to a batch. If not batched,
|
||||||
|
* the message is immediately sent over the worker to the main thread.
|
||||||
|
* @param {any} message the message received from the WebSocket. See the WebSocket
|
||||||
|
* documentation for more details -
|
||||||
|
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
|
||||||
|
* @returns
|
||||||
|
*/
|
||||||
|
shouldBatchMessage(message) {
|
||||||
|
return (
|
||||||
|
this.#batchingStrategy.shouldBatchMessage &&
|
||||||
|
this.#batchingStrategy.shouldBatchMessage(message)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Adds the given message to a batch. The batch group that the message is added
|
||||||
|
* to will be determined by the value returned by `getBatchIdFromMessage`.
|
||||||
|
* @param {any} message the message received from the WebSocket. See the WebSocket
|
||||||
|
* documentation for more details -
|
||||||
|
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
|
||||||
|
*/
|
||||||
|
addMessageToBatch(message) {
|
||||||
|
const batchId = this.#batchingStrategy.getBatchIdFromMessage(message);
|
||||||
|
let batch = this.#batch[batchId];
|
||||||
|
if (batch === undefined) {
|
||||||
|
batch = this.#batch[batchId] = [message];
|
||||||
|
} else {
|
||||||
|
batch.push(message);
|
||||||
|
}
|
||||||
|
if (batch.length > this.#maxBatchSize) {
|
||||||
|
batch.shift();
|
||||||
|
this.#batch.dropped = this.#batch.dropped || true;
|
||||||
|
}
|
||||||
|
if (this.#readyForNextBatch) {
|
||||||
|
this.#sendNextBatch();
|
||||||
|
} else {
|
||||||
|
this.#hasBatch = true;
|
||||||
|
}
|
||||||
|
this.#transferables.push(message);
|
||||||
|
}
|
||||||
|
setMaxBatchSize(maxBatchSize) {
|
||||||
|
this.#maxBatchSize = maxBatchSize;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Indicates that client code is ready to receive the next batch of
|
||||||
|
* messages. If a batch is available, it will be immediately sent.
|
||||||
|
* Otherwise a flag will be set to send the next batch as soon as
|
||||||
|
* any new data is available.
|
||||||
|
*/
|
||||||
|
readyForNextBatch() {
|
||||||
|
if (this.#hasBatch) {
|
||||||
|
this.#sendNextBatch();
|
||||||
|
} else {
|
||||||
|
this.#readyForNextBatch = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#sendNextBatch() {
|
||||||
|
const batch = this.#batch;
|
||||||
|
this.#resetBatch();
|
||||||
|
this.#worker.postMessage(
|
||||||
|
{
|
||||||
|
type: 'batch',
|
||||||
|
batch
|
||||||
|
},
|
||||||
|
{
|
||||||
|
transfer: this.#transferables
|
||||||
|
}
|
||||||
|
);
|
||||||
|
this.#readyForNextBatch = false;
|
||||||
|
this.#hasBatch = false;
|
||||||
|
this.#transferables = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const websocket = new ResilientWebSocket();
|
||||||
|
const messageBatcher = new MessageBatcher(self);
|
||||||
|
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher);
|
||||||
|
const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self);
|
||||||
|
|
||||||
|
self.addEventListener('message', (message) => {
|
||||||
|
workerBroker.routeMessageToHandler(message);
|
||||||
|
});
|
||||||
|
websocket.registerMessageCallback((data) => {
|
||||||
|
websocketBroker.routeMessageToHandler(data);
|
||||||
|
});
|
||||||
|
}
|
@ -139,7 +139,9 @@ export default class Model extends EventEmitter {
|
|||||||
|
|
||||||
/** @typedef {any} TODO */
|
/** @typedef {any} TODO */
|
||||||
|
|
||||||
/** @typedef {TODO} OpenMCT */
|
/**
|
||||||
|
* @typedef {import('../../../../openmct.js').OpenMCT} OpenMCT
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@template {object} T
|
@template {object} T
|
||||||
|
@ -211,9 +211,16 @@ export default class PlotSeries extends Model {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (!this.unsubscribe) {
|
if (!this.unsubscribe) {
|
||||||
this.unsubscribe = this.openmct.telemetry.subscribe(this.domainObject, this.add.bind(this), {
|
this.unsubscribe = this.openmct.telemetry.subscribe(
|
||||||
filters: this.filters
|
this.domainObject,
|
||||||
});
|
(data) => {
|
||||||
|
this.addAll(data, true);
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: this.filters,
|
||||||
|
strategy: this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -302,9 +309,7 @@ export default class PlotSeries extends Model {
|
|||||||
this.resetStats();
|
this.resetStats();
|
||||||
this.emit('reset');
|
this.emit('reset');
|
||||||
if (newData) {
|
if (newData) {
|
||||||
newData.forEach(function (point) {
|
this.addAll(newData, true);
|
||||||
this.add(point, true);
|
|
||||||
}, this);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -416,14 +421,14 @@ export default class PlotSeries extends Model {
|
|||||||
* when adding an array of points that are already properly sorted.
|
* when adding an array of points that are already properly sorted.
|
||||||
*
|
*
|
||||||
* @private
|
* @private
|
||||||
* @param {Object} point a telemetry datum.
|
* @param {Object} newData a telemetry datum.
|
||||||
* @param {Boolean} [appendOnly] default false, if true will append
|
* @param {Boolean} [sorted] default false, if true will append
|
||||||
* a point to the end without dupe checking.
|
* a point to the end without dupe checking.
|
||||||
*/
|
*/
|
||||||
add(point, appendOnly) {
|
add(newData, sorted = false) {
|
||||||
let data = this.getSeriesData();
|
let data = this.getSeriesData();
|
||||||
let insertIndex = data.length;
|
let insertIndex = data.length;
|
||||||
const currentYVal = this.getYVal(point);
|
const currentYVal = this.getYVal(newData);
|
||||||
const lastYVal = this.getYVal(data[insertIndex - 1]);
|
const lastYVal = this.getYVal(data[insertIndex - 1]);
|
||||||
|
|
||||||
if (this.isValueInvalid(currentYVal) && this.isValueInvalid(lastYVal)) {
|
if (this.isValueInvalid(currentYVal) && this.isValueInvalid(lastYVal)) {
|
||||||
@ -432,22 +437,28 @@ export default class PlotSeries extends Model {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!appendOnly) {
|
if (!sorted) {
|
||||||
insertIndex = this.sortedIndex(point);
|
insertIndex = this.sortedIndex(newData);
|
||||||
if (this.getXVal(data[insertIndex]) === this.getXVal(point)) {
|
if (this.getXVal(data[insertIndex]) === this.getXVal(newData)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.getXVal(data[insertIndex - 1]) === this.getXVal(point)) {
|
if (this.getXVal(data[insertIndex - 1]) === this.getXVal(newData)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.updateStats(point);
|
this.updateStats(newData);
|
||||||
point.mctLimitState = this.evaluate(point);
|
newData.mctLimitState = this.evaluate(newData);
|
||||||
data.splice(insertIndex, 0, point);
|
data.splice(insertIndex, 0, newData);
|
||||||
this.updateSeriesData(data);
|
this.updateSeriesData(data);
|
||||||
this.emit('add', point, insertIndex, this);
|
this.emit('add', newData, insertIndex, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
addAll(points, sorted = false) {
|
||||||
|
for (let i = 0; i < points.length; i++) {
|
||||||
|
this.add(points[i], sorted);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -59,26 +59,21 @@ export default class RemoteClock extends DefaultClock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
start() {
|
start() {
|
||||||
this.openmct.objects
|
this.openmct.objects.get(this.identifier).then((domainObject) => {
|
||||||
.get(this.identifier)
|
// The start method is called when at least one listener registers with the clock.
|
||||||
.then((domainObject) => {
|
// When the clock is changed, listeners are unregistered from the clock and the stop method is called.
|
||||||
// The start method is called when at least one listener registers with the clock.
|
// Sometimes, the objects.get call above does not resolve before the stop method is called.
|
||||||
// When the clock is changed, listeners are unregistered from the clock and the stop method is called.
|
// So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock.
|
||||||
// Sometimes, the objects.get call above does not resolve before the stop method is called.
|
if (this.eventNames().length === 0) {
|
||||||
// So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock.
|
return;
|
||||||
if (this.eventNames().length === 0) {
|
}
|
||||||
return;
|
this.openmct.time.on('timeSystem', this._timeSystemChange);
|
||||||
}
|
this.timeTelemetryObject = domainObject;
|
||||||
this.openmct.time.on('timeSystem', this._timeSystemChange);
|
this.metadata = this.openmct.telemetry.getMetadata(domainObject);
|
||||||
this.timeTelemetryObject = domainObject;
|
this._timeSystemChange();
|
||||||
this.metadata = this.openmct.telemetry.getMetadata(domainObject);
|
this._requestLatest();
|
||||||
this._timeSystemChange();
|
this._subscribe();
|
||||||
this._requestLatest();
|
});
|
||||||
this._subscribe();
|
|
||||||
})
|
|
||||||
.catch((error) => {
|
|
||||||
throw new Error(error);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
|
Reference in New Issue
Block a user