mirror of
https://github.com/nasa/openmct.git
synced 2024-12-18 20:57:53 +00:00
Improve telemetry buffering implementation (#7837)
* Simplifies the implementation of telemetry buffering in Open MCT. * Switches from per-parameter buffering to a shared queue. Per-parameter buffering is too easily overwhelmed by bursts of telemetry from high-frequency parameters. A single shared buffer has more overhead to grow when a burst arrives without overflowing, because the buffer is shared across all parameters. * Removes the need for plugins to pass serialized code to a worker. * Switched to a "one-size-fits-all" batching strategy removing the need for plugins to define a batching strategy at all. * Captures buffering statistics for display in the PerformanceIndicator.
This commit is contained in:
parent
c498f7d20c
commit
29f1956d1a
@ -0,0 +1,72 @@
|
|||||||
|
/*****************************************************************************
|
||||||
|
* Open MCT, Copyright (c) 2014-2023, United States Government
|
||||||
|
* as represented by the Administrator of the National Aeronautics and Space
|
||||||
|
* Administration. All rights reserved.
|
||||||
|
*
|
||||||
|
* Open MCT is licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0.
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
* Open MCT includes source code licensed under additional open source
|
||||||
|
* licenses. See the Open Source Licenses file (LICENSES.md) included with
|
||||||
|
* this source code distribution or the Licensing information page available
|
||||||
|
* at runtime from the About dialog for additional information.
|
||||||
|
*****************************************************************************/
|
||||||
|
import { expect, test } from '../../../../pluginFixtures.js';
|
||||||
|
|
||||||
|
test.describe('The performance indicator', () => {
|
||||||
|
test.beforeEach(async ({ page }) => {
|
||||||
|
await page.goto('./', { waitUntil: 'domcontentloaded' });
|
||||||
|
await page.evaluate(() => {
|
||||||
|
const openmct = window.openmct;
|
||||||
|
openmct.install(openmct.plugins.PerformanceIndicator());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('can be installed', ({ page }) => {
|
||||||
|
const performanceIndicator = page.getByTitle('Performance Indicator');
|
||||||
|
expect(performanceIndicator).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Shows a numerical FPS value', async ({ page }) => {
|
||||||
|
// Frames Per Second. We need to wait at least 1 second to get a value.
|
||||||
|
// eslint-disable-next-line playwright/no-wait-for-timeout
|
||||||
|
await page.waitForTimeout(1000);
|
||||||
|
await expect(page.getByTitle('Performance Indicator')).toHaveText(/\d\d? fps/);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Supports showing optional extended performance information in an overlay for debugging', async ({
|
||||||
|
page
|
||||||
|
}) => {
|
||||||
|
const performanceMeasurementLabel = 'Some measurement';
|
||||||
|
const performanceMeasurementValue = 'Some value';
|
||||||
|
|
||||||
|
await page.evaluate(
|
||||||
|
({ performanceMeasurementLabel: label, performanceMeasurementValue: value }) => {
|
||||||
|
const openmct = window.openmct;
|
||||||
|
openmct.performance.measurements.set(label, value);
|
||||||
|
},
|
||||||
|
{ performanceMeasurementLabel, performanceMeasurementValue }
|
||||||
|
);
|
||||||
|
const performanceIndicator = page.getByTitle('Performance Indicator');
|
||||||
|
await performanceIndicator.click();
|
||||||
|
//Performance overlay is a crude debugging tool, it's evaluated once per second.
|
||||||
|
// eslint-disable-next-line playwright/no-wait-for-timeout
|
||||||
|
await page.waitForTimeout(1000);
|
||||||
|
const performanceOverlay = page.getByTitle('Performance Overlay');
|
||||||
|
await expect(performanceOverlay).toBeVisible();
|
||||||
|
await expect(performanceOverlay).toHaveText(new RegExp(`${performanceMeasurementLabel}.*`));
|
||||||
|
await expect(performanceOverlay).toHaveText(new RegExp(`.*${performanceMeasurementValue}`));
|
||||||
|
|
||||||
|
//Confirm that it disappears if we click on it again.
|
||||||
|
await performanceIndicator.click();
|
||||||
|
await expect(performanceOverlay).toBeHidden();
|
||||||
|
});
|
||||||
|
});
|
@ -20,25 +20,46 @@
|
|||||||
* at runtime from the About dialog for additional information.
|
* at runtime from the About dialog for additional information.
|
||||||
*****************************************************************************/
|
*****************************************************************************/
|
||||||
import installWorker from './WebSocketWorker.js';
|
import installWorker from './WebSocketWorker.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Describes the strategy to be used when batching WebSocket messages
|
* @typedef RequestIdleCallbackOptions
|
||||||
*
|
* @prop {Number} timeout If the number of milliseconds represented by this
|
||||||
* @typedef BatchingStrategy
|
* parameter has elapsed and the callback has not already been called, invoke
|
||||||
* @property {Function} shouldBatchMessage a function that accepts a single
|
* the callback.
|
||||||
* argument - the raw message received from the websocket. Every message
|
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
|
||||||
* received will be evaluated against this function so it should be performant.
|
|
||||||
* Note also that this function is executed in a worker, so it must be
|
|
||||||
* completely self-contained with no external dependencies. The function
|
|
||||||
* should return `true` if the message should be batched, and `false` if not.
|
|
||||||
* @property {Function} getBatchIdFromMessage a function that accepts a
|
|
||||||
* single argument - the raw message received from the websocket. Only messages
|
|
||||||
* where `shouldBatchMessage` has evaluated to true will be passed into this
|
|
||||||
* function. The function should return a unique value on which to batch the
|
|
||||||
* messages. For example a telemetry, channel, or parameter identifier.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides a reliable and convenient WebSocket abstraction layer that handles
|
* Mocks requestIdleCallback for Safari using setTimeout. Functionality will be
|
||||||
* a lot of boilerplate common to managing WebSocket connections such as:
|
* identical to setTimeout in Safari, which is to fire the callback function
|
||||||
|
* after the provided timeout period.
|
||||||
|
*
|
||||||
|
* In browsers that support requestIdleCallback, this const is just a
|
||||||
|
* pointer to the native function.
|
||||||
|
*
|
||||||
|
* @param {Function} callback a callback to be invoked during the next idle period, or
|
||||||
|
* after the specified timeout
|
||||||
|
* @param {RequestIdleCallbackOptions} options
|
||||||
|
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
function requestIdleCallbackPolyfill(callback, options) {
|
||||||
|
return (
|
||||||
|
// eslint-disable-next-line compat/compat
|
||||||
|
window.requestIdleCallback ??
|
||||||
|
((fn, { timeout }) =>
|
||||||
|
setTimeout(() => {
|
||||||
|
fn({ didTimeout: false });
|
||||||
|
}, timeout))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const requestIdleCallback = requestIdleCallbackPolyfill();
|
||||||
|
|
||||||
|
const ONE_SECOND = 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a WebSocket abstraction layer that handles a lot of boilerplate common
|
||||||
|
* to managing WebSocket connections such as:
|
||||||
* - Establishing a WebSocket connection to a server
|
* - Establishing a WebSocket connection to a server
|
||||||
* - Reconnecting on error, with a fallback strategy
|
* - Reconnecting on error, with a fallback strategy
|
||||||
* - Queuing messages so that clients can send messages without concern for the current
|
* - Queuing messages so that clients can send messages without concern for the current
|
||||||
@ -49,22 +70,19 @@ import installWorker from './WebSocketWorker.js';
|
|||||||
* and batching of messages without blocking either the UI or server.
|
* and batching of messages without blocking either the UI or server.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
// Shim for Internet Explorer, I mean Safari. It doesn't support requestIdleCallback, but it's in a tech preview, so it will be dropping soon.
|
|
||||||
const requestIdleCallback =
|
|
||||||
// eslint-disable-next-line compat/compat
|
|
||||||
window.requestIdleCallback ?? ((fn, { timeout }) => setTimeout(fn, timeout));
|
|
||||||
const ONE_SECOND = 1000;
|
|
||||||
const FIVE_SECONDS = 5 * ONE_SECOND;
|
|
||||||
|
|
||||||
class BatchingWebSocket extends EventTarget {
|
class BatchingWebSocket extends EventTarget {
|
||||||
#worker;
|
#worker;
|
||||||
#openmct;
|
#openmct;
|
||||||
#showingRateLimitNotification;
|
#showingRateLimitNotification;
|
||||||
#maxBatchSize;
|
#maxBufferSize;
|
||||||
#applicationIsInitializing;
|
#throttleRate;
|
||||||
#maxBatchWait;
|
|
||||||
#firstBatchReceived;
|
#firstBatchReceived;
|
||||||
|
#lastBatchReceived;
|
||||||
|
#peakBufferSize = Number.NEGATIVE_INFINITY;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {import('openmct.js').OpenMCT} openmct
|
||||||
|
*/
|
||||||
constructor(openmct) {
|
constructor(openmct) {
|
||||||
super();
|
super();
|
||||||
// Install worker, register listeners etc.
|
// Install worker, register listeners etc.
|
||||||
@ -74,9 +92,8 @@ class BatchingWebSocket extends EventTarget {
|
|||||||
this.#worker = new Worker(workerUrl);
|
this.#worker = new Worker(workerUrl);
|
||||||
this.#openmct = openmct;
|
this.#openmct = openmct;
|
||||||
this.#showingRateLimitNotification = false;
|
this.#showingRateLimitNotification = false;
|
||||||
this.#maxBatchSize = Number.POSITIVE_INFINITY;
|
this.#maxBufferSize = Number.POSITIVE_INFINITY;
|
||||||
this.#maxBatchWait = ONE_SECOND;
|
this.#throttleRate = ONE_SECOND;
|
||||||
this.#applicationIsInitializing = true;
|
|
||||||
this.#firstBatchReceived = false;
|
this.#firstBatchReceived = false;
|
||||||
|
|
||||||
const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
|
const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
|
||||||
@ -89,20 +106,6 @@ class BatchingWebSocket extends EventTarget {
|
|||||||
},
|
},
|
||||||
{ once: true }
|
{ once: true }
|
||||||
);
|
);
|
||||||
|
|
||||||
openmct.once('start', () => {
|
|
||||||
// An idle callback is a pretty good indication that a complex display is done loading. At that point set the batch size more conservatively.
|
|
||||||
// Force it after 5 seconds if it hasn't happened yet.
|
|
||||||
requestIdleCallback(
|
|
||||||
() => {
|
|
||||||
this.#applicationIsInitializing = false;
|
|
||||||
this.setMaxBatchSize(this.#maxBatchSize);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
timeout: FIVE_SECONDS
|
|
||||||
}
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -137,57 +140,48 @@ class BatchingWebSocket extends EventTarget {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the strategy used to both decide which raw messages to batch, and how to group
|
* @param {number} maxBufferSize the maximum length of the receive buffer in characters.
|
||||||
* them.
|
|
||||||
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
|
|
||||||
* raw messages from the WebSocket.
|
|
||||||
*/
|
|
||||||
setBatchingStrategy(strategy) {
|
|
||||||
const serializedStrategy = {
|
|
||||||
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
|
|
||||||
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
|
|
||||||
};
|
|
||||||
|
|
||||||
this.#worker.postMessage({
|
|
||||||
type: 'setBatchingStrategy',
|
|
||||||
serializedStrategy
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {number} maxBatchSize the maximum length of a batch of messages. For example,
|
|
||||||
* the maximum number of telemetry values to batch before dropping them
|
|
||||||
* Note that this is a fail-safe that is only invoked if performance drops to the
|
* Note that this is a fail-safe that is only invoked if performance drops to the
|
||||||
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
|
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
|
||||||
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
|
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
|
||||||
* most recent telemetry. The user will be informed that telemetry has been dropped.
|
* most recent telemetry. The user will be informed that telemetry has been dropped.
|
||||||
*
|
*
|
||||||
* This should be set appropriately for the expected data rate. eg. If telemetry
|
* This should be set appropriately for the expected data rate. eg. If typical usage
|
||||||
* is received at 10Hz for each telemetry point, then a minimal combination of batch
|
* sees 2000 messages arriving at a client per second, with an average message size
|
||||||
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
|
* of 500 bytes, then 2000 * 500 = 1000000 characters will be right on the limit.
|
||||||
* 15 would probably be a better batch size.
|
* In this scenario, a buffer size of 1500000 character might be more appropriate
|
||||||
|
* to allow some overhead for bursty telemetry, and temporary UI load during page
|
||||||
|
* load.
|
||||||
|
*
|
||||||
|
* The PerformanceIndicator plugin (openmct.plugins.PerformanceIndicator) gives
|
||||||
|
* statistics on buffer utilization. It can be used to scale the buffer appropriately.
|
||||||
*/
|
*/
|
||||||
setMaxBatchSize(maxBatchSize) {
|
setMaxBufferSize(maxBatchSize) {
|
||||||
this.#maxBatchSize = maxBatchSize;
|
this.#maxBufferSize = maxBatchSize;
|
||||||
if (!this.#applicationIsInitializing) {
|
this.#sendMaxBufferSizeToWorker(this.#maxBufferSize);
|
||||||
this.#sendMaxBatchSizeToWorker(this.#maxBatchSize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
setMaxBatchWait(wait) {
|
setThrottleRate(throttleRate) {
|
||||||
this.#maxBatchWait = wait;
|
this.#throttleRate = throttleRate;
|
||||||
this.#sendBatchWaitToWorker(this.#maxBatchWait);
|
this.#sendThrottleRateToWorker(this.#throttleRate);
|
||||||
}
|
}
|
||||||
#sendMaxBatchSizeToWorker(maxBatchSize) {
|
setThrottleMessagePattern(throttleMessagePattern) {
|
||||||
this.#worker.postMessage({
|
this.#worker.postMessage({
|
||||||
type: 'setMaxBatchSize',
|
type: 'setThrottleMessagePattern',
|
||||||
maxBatchSize
|
throttleMessagePattern
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendBatchWaitToWorker(maxBatchWait) {
|
#sendMaxBufferSizeToWorker(maxBufferSize) {
|
||||||
this.#worker.postMessage({
|
this.#worker.postMessage({
|
||||||
type: 'setMaxBatchWait',
|
type: 'setMaxBufferSize',
|
||||||
maxBatchWait
|
maxBufferSize
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#sendThrottleRateToWorker(throttleRate) {
|
||||||
|
this.#worker.postMessage({
|
||||||
|
type: 'setThrottleRate',
|
||||||
|
throttleRate
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,9 +197,38 @@ class BatchingWebSocket extends EventTarget {
|
|||||||
|
|
||||||
#routeMessageToHandler(message) {
|
#routeMessageToHandler(message) {
|
||||||
if (message.data.type === 'batch') {
|
if (message.data.type === 'batch') {
|
||||||
this.start = Date.now();
|
|
||||||
const batch = message.data.batch;
|
const batch = message.data.batch;
|
||||||
if (batch.dropped === true && !this.#showingRateLimitNotification) {
|
const now = performance.now();
|
||||||
|
|
||||||
|
let currentBufferLength = message.data.currentBufferLength;
|
||||||
|
let maxBufferSize = message.data.maxBufferSize;
|
||||||
|
let parameterCount = batch.length;
|
||||||
|
if (this.#peakBufferSize < currentBufferLength) {
|
||||||
|
this.#peakBufferSize = currentBufferLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#openmct.performance !== undefined) {
|
||||||
|
if (!isNaN(this.#lastBatchReceived)) {
|
||||||
|
const elapsed = (now - this.#lastBatchReceived) / 1000;
|
||||||
|
this.#lastBatchReceived = now;
|
||||||
|
this.#openmct.performance.measurements.set(
|
||||||
|
'Parameters/s',
|
||||||
|
Math.floor(parameterCount / elapsed)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.#openmct.performance.measurements.set(
|
||||||
|
'Buff. Util. (bytes)',
|
||||||
|
`${currentBufferLength} / ${maxBufferSize}`
|
||||||
|
);
|
||||||
|
this.#openmct.performance.measurements.set(
|
||||||
|
'Peak Buff. Util. (bytes)',
|
||||||
|
`${this.#peakBufferSize} / ${maxBufferSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.start = Date.now();
|
||||||
|
const dropped = message.data.dropped;
|
||||||
|
if (dropped === true && !this.#showingRateLimitNotification) {
|
||||||
const notification = this.#openmct.notifications.alert(
|
const notification = this.#openmct.notifications.alert(
|
||||||
'Telemetry dropped due to client rate limiting.',
|
'Telemetry dropped due to client rate limiting.',
|
||||||
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
|
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
|
||||||
@ -240,18 +263,16 @@ class BatchingWebSocket extends EventTarget {
|
|||||||
console.warn(`Event loop is too busy to process batch.`);
|
console.warn(`Event loop is too busy to process batch.`);
|
||||||
this.#waitUntilIdleAndRequestNextBatch(batch);
|
this.#waitUntilIdleAndRequestNextBatch(batch);
|
||||||
} else {
|
} else {
|
||||||
// After ingesting a telemetry batch, wait until the event loop is idle again before
|
|
||||||
// informing the worker we are ready for another batch.
|
|
||||||
this.#readyForNextBatch();
|
this.#readyForNextBatch();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (waitedFor > ONE_SECOND) {
|
if (waitedFor > this.#throttleRate) {
|
||||||
console.warn(`Warning, batch processing took ${waitedFor}ms`);
|
console.warn(`Warning, batch processing took ${waitedFor}ms`);
|
||||||
}
|
}
|
||||||
this.#readyForNextBatch();
|
this.#readyForNextBatch();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ timeout: ONE_SECOND }
|
{ timeout: this.#throttleRate }
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,10 +24,6 @@ export default function installWorker() {
|
|||||||
const ONE_SECOND = 1000;
|
const ONE_SECOND = 1000;
|
||||||
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
|
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {import('./BatchingWebSocket').BatchingStrategy} BatchingStrategy
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides a WebSocket connection that is resilient to errors and dropouts.
|
* Provides a WebSocket connection that is resilient to errors and dropouts.
|
||||||
* On an error or dropout, will automatically reconnect.
|
* On an error or dropout, will automatically reconnect.
|
||||||
@ -215,17 +211,17 @@ export default function installWorker() {
|
|||||||
case 'message':
|
case 'message':
|
||||||
this.#websocket.enqueueMessage(message.data.message);
|
this.#websocket.enqueueMessage(message.data.message);
|
||||||
break;
|
break;
|
||||||
case 'setBatchingStrategy':
|
|
||||||
this.setBatchingStrategy(message);
|
|
||||||
break;
|
|
||||||
case 'readyForNextBatch':
|
case 'readyForNextBatch':
|
||||||
this.#messageBatcher.readyForNextBatch();
|
this.#messageBatcher.readyForNextBatch();
|
||||||
break;
|
break;
|
||||||
case 'setMaxBatchSize':
|
case 'setMaxBufferSize':
|
||||||
this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize);
|
this.#messageBatcher.setMaxBufferSize(message.data.maxBufferSize);
|
||||||
break;
|
break;
|
||||||
case 'setMaxBatchWait':
|
case 'setThrottleRate':
|
||||||
this.#messageBatcher.setMaxBatchWait(message.data.maxBatchWait);
|
this.#messageBatcher.setThrottleRate(message.data.throttleRate);
|
||||||
|
break;
|
||||||
|
case 'setThrottleMessagePattern':
|
||||||
|
this.#messageBatcher.setThrottleMessagePattern(message.data.throttleMessagePattern);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new Error(`Unknown message type: ${type}`);
|
throw new Error(`Unknown message type: ${type}`);
|
||||||
@ -238,122 +234,69 @@ export default function installWorker() {
|
|||||||
disconnect() {
|
disconnect() {
|
||||||
this.#websocket.disconnect();
|
this.#websocket.disconnect();
|
||||||
}
|
}
|
||||||
setBatchingStrategy(message) {
|
|
||||||
const { serializedStrategy } = message.data;
|
|
||||||
const batchingStrategy = {
|
|
||||||
// eslint-disable-next-line no-new-func
|
|
||||||
shouldBatchMessage: new Function(`return ${serializedStrategy.shouldBatchMessage}`)(),
|
|
||||||
// eslint-disable-next-line no-new-func
|
|
||||||
getBatchIdFromMessage: new Function(`return ${serializedStrategy.getBatchIdFromMessage}`)()
|
|
||||||
// Will also include maximum batch length here
|
|
||||||
};
|
|
||||||
this.#messageBatcher.setBatchingStrategy(batchingStrategy);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Received messages from the WebSocket, and passes them along to the
|
* Responsible for buffering messages
|
||||||
* Worker interface and back to the main thread.
|
|
||||||
*/
|
*/
|
||||||
class WebSocketToWorkerMessageBroker {
|
class MessageBuffer {
|
||||||
#worker;
|
#buffer;
|
||||||
#messageBatcher;
|
#currentBufferLength;
|
||||||
|
#dropped;
|
||||||
constructor(messageBatcher, worker) {
|
#maxBufferSize;
|
||||||
this.#messageBatcher = messageBatcher;
|
|
||||||
this.#worker = worker;
|
|
||||||
}
|
|
||||||
|
|
||||||
routeMessageToHandler(data) {
|
|
||||||
if (this.#messageBatcher.shouldBatchMessage(data)) {
|
|
||||||
this.#messageBatcher.addMessageToBatch(data);
|
|
||||||
} else {
|
|
||||||
this.#worker.postMessage({
|
|
||||||
type: 'message',
|
|
||||||
message: data
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Responsible for batching messages according to the defined batching strategy.
|
|
||||||
*/
|
|
||||||
class MessageBatcher {
|
|
||||||
#batch;
|
|
||||||
#batchingStrategy;
|
|
||||||
#hasBatch = false;
|
|
||||||
#maxBatchSize;
|
|
||||||
#readyForNextBatch;
|
#readyForNextBatch;
|
||||||
#worker;
|
#worker;
|
||||||
#throttledSendNextBatch;
|
#throttledSendNextBatch;
|
||||||
|
#throttleMessagePattern;
|
||||||
|
|
||||||
constructor(worker) {
|
constructor(worker) {
|
||||||
// No dropping telemetry unless we're explicitly told to.
|
// No dropping telemetry unless we're explicitly told to.
|
||||||
this.#maxBatchSize = Number.POSITIVE_INFINITY;
|
this.#maxBufferSize = Number.POSITIVE_INFINITY;
|
||||||
this.#readyForNextBatch = false;
|
this.#readyForNextBatch = false;
|
||||||
this.#worker = worker;
|
this.#worker = worker;
|
||||||
this.#resetBatch();
|
this.#resetBatch();
|
||||||
this.setMaxBatchWait(ONE_SECOND);
|
this.setThrottleRate(ONE_SECOND);
|
||||||
}
|
}
|
||||||
#resetBatch() {
|
#resetBatch() {
|
||||||
this.#batch = {};
|
//this.#batch = {};
|
||||||
this.#hasBatch = false;
|
this.#buffer = [];
|
||||||
|
this.#currentBufferLength = 0;
|
||||||
|
this.#dropped = false;
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* @param {BatchingStrategy} strategy
|
addMessageToBuffer(message) {
|
||||||
*/
|
this.#buffer.push(message);
|
||||||
setBatchingStrategy(strategy) {
|
this.#currentBufferLength += message.length;
|
||||||
this.#batchingStrategy = strategy;
|
|
||||||
}
|
for (
|
||||||
/**
|
let i = 0;
|
||||||
* Applies the `shouldBatchMessage` function from the supplied batching strategy
|
this.#currentBufferLength > this.#maxBufferSize && i < this.#buffer.length;
|
||||||
* to each message to determine if it should be added to a batch. If not batched,
|
i++
|
||||||
* the message is immediately sent over the worker to the main thread.
|
) {
|
||||||
* @param {any} message the message received from the WebSocket. See the WebSocket
|
const messageToConsider = this.#buffer[i];
|
||||||
* documentation for more details -
|
if (this.#shouldThrottle(messageToConsider)) {
|
||||||
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
|
this.#buffer.splice(i, 1);
|
||||||
* @returns
|
this.#currentBufferLength -= messageToConsider.length;
|
||||||
*/
|
this.#dropped = true;
|
||||||
shouldBatchMessage(message) {
|
}
|
||||||
return (
|
|
||||||
this.#batchingStrategy.shouldBatchMessage &&
|
|
||||||
this.#batchingStrategy.shouldBatchMessage(message)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Adds the given message to a batch. The batch group that the message is added
|
|
||||||
* to will be determined by the value returned by `getBatchIdFromMessage`.
|
|
||||||
* @param {any} message the message received from the WebSocket. See the WebSocket
|
|
||||||
* documentation for more details -
|
|
||||||
* https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data
|
|
||||||
*/
|
|
||||||
addMessageToBatch(message) {
|
|
||||||
const batchId = this.#batchingStrategy.getBatchIdFromMessage(message);
|
|
||||||
let batch = this.#batch[batchId];
|
|
||||||
if (batch === undefined) {
|
|
||||||
this.#hasBatch = true;
|
|
||||||
batch = this.#batch[batchId] = [message];
|
|
||||||
} else {
|
|
||||||
batch.push(message);
|
|
||||||
}
|
|
||||||
if (batch.length > this.#maxBatchSize) {
|
|
||||||
console.warn(
|
|
||||||
`Exceeded max batch size of ${this.#maxBatchSize} for ${batchId}. Dropping value.`
|
|
||||||
);
|
|
||||||
batch.shift();
|
|
||||||
this.#batch.dropped = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.#readyForNextBatch) {
|
if (this.#readyForNextBatch) {
|
||||||
this.#throttledSendNextBatch();
|
this.#throttledSendNextBatch();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
setMaxBatchSize(maxBatchSize) {
|
|
||||||
this.#maxBatchSize = maxBatchSize;
|
#shouldThrottle(message) {
|
||||||
|
return (
|
||||||
|
this.#throttleMessagePattern !== undefined && this.#throttleMessagePattern.test(message)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
setMaxBatchWait(maxBatchWait) {
|
|
||||||
this.#throttledSendNextBatch = throttle(this.#sendNextBatch.bind(this), maxBatchWait);
|
setMaxBufferSize(maxBufferSize) {
|
||||||
|
this.#maxBufferSize = maxBufferSize;
|
||||||
|
}
|
||||||
|
setThrottleRate(throttleRate) {
|
||||||
|
this.#throttledSendNextBatch = throttle(this.#sendNextBatch.bind(this), throttleRate);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Indicates that client code is ready to receive the next batch of
|
* Indicates that client code is ready to receive the next batch of
|
||||||
@ -362,21 +305,33 @@ export default function installWorker() {
|
|||||||
* any new data is available.
|
* any new data is available.
|
||||||
*/
|
*/
|
||||||
readyForNextBatch() {
|
readyForNextBatch() {
|
||||||
if (this.#hasBatch) {
|
if (this.#hasData()) {
|
||||||
this.#throttledSendNextBatch();
|
this.#throttledSendNextBatch();
|
||||||
} else {
|
} else {
|
||||||
this.#readyForNextBatch = true;
|
this.#readyForNextBatch = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#sendNextBatch() {
|
#sendNextBatch() {
|
||||||
const batch = this.#batch;
|
const buffer = this.#buffer;
|
||||||
|
const dropped = this.#dropped;
|
||||||
|
const currentBufferLength = this.#currentBufferLength;
|
||||||
|
|
||||||
this.#resetBatch();
|
this.#resetBatch();
|
||||||
this.#worker.postMessage({
|
this.#worker.postMessage({
|
||||||
type: 'batch',
|
type: 'batch',
|
||||||
batch
|
dropped,
|
||||||
|
currentBufferLength: currentBufferLength,
|
||||||
|
maxBufferSize: this.#maxBufferSize,
|
||||||
|
batch: buffer
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#readyForNextBatch = false;
|
this.#readyForNextBatch = false;
|
||||||
this.#hasBatch = false;
|
}
|
||||||
|
#hasData() {
|
||||||
|
return this.#currentBufferLength > 0;
|
||||||
|
}
|
||||||
|
setThrottleMessagePattern(priorityMessagePattern) {
|
||||||
|
this.#throttleMessagePattern = new RegExp(priorityMessagePattern, 'm');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -408,15 +363,14 @@ export default function installWorker() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const websocket = new ResilientWebSocket(self);
|
const websocket = new ResilientWebSocket(self);
|
||||||
const messageBatcher = new MessageBatcher(self);
|
const messageBuffer = new MessageBuffer(self);
|
||||||
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher);
|
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBuffer);
|
||||||
const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self);
|
|
||||||
|
|
||||||
self.addEventListener('message', (message) => {
|
self.addEventListener('message', (message) => {
|
||||||
workerBroker.routeMessageToHandler(message);
|
workerBroker.routeMessageToHandler(message);
|
||||||
});
|
});
|
||||||
websocket.registerMessageCallback((data) => {
|
websocket.registerMessageCallback((data) => {
|
||||||
websocketBroker.routeMessageToHandler(data);
|
messageBuffer.addMessageToBuffer(data);
|
||||||
});
|
});
|
||||||
|
|
||||||
self.websocketInstance = websocket;
|
self.websocketInstance = websocket;
|
||||||
|
@ -19,14 +19,23 @@
|
|||||||
* this source code distribution or the Licensing information page available
|
* this source code distribution or the Licensing information page available
|
||||||
* at runtime from the About dialog for additional information.
|
* at runtime from the About dialog for additional information.
|
||||||
*****************************************************************************/
|
*****************************************************************************/
|
||||||
|
const PERFORMANCE_OVERLAY_RENDER_INTERVAL = 1000;
|
||||||
|
|
||||||
export default function PerformanceIndicator() {
|
export default function PerformanceIndicator() {
|
||||||
return function install(openmct) {
|
return function install(openmct) {
|
||||||
let frames = 0;
|
let frames = 0;
|
||||||
let lastCalculated = performance.now();
|
let lastCalculated = performance.now();
|
||||||
const indicator = openmct.indicators.simpleIndicator();
|
openmct.performance = {
|
||||||
|
measurements: new Map()
|
||||||
|
};
|
||||||
|
|
||||||
|
const indicator = openmct.indicators.simpleIndicator();
|
||||||
|
indicator.key = 'performance-indicator';
|
||||||
indicator.text('~ fps');
|
indicator.text('~ fps');
|
||||||
|
indicator.description('Performance Indicator');
|
||||||
indicator.statusClass('s-status-info');
|
indicator.statusClass('s-status-info');
|
||||||
|
indicator.on('click', showOverlay);
|
||||||
|
|
||||||
openmct.indicators.add(indicator);
|
openmct.indicators.add(indicator);
|
||||||
|
|
||||||
let rafHandle = requestAnimationFrame(incrementFrames);
|
let rafHandle = requestAnimationFrame(incrementFrames);
|
||||||
@ -58,5 +67,58 @@ export default function PerformanceIndicator() {
|
|||||||
indicator.statusClass('s-status-error');
|
indicator.statusClass('s-status-error');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function showOverlay() {
|
||||||
|
const overlayStylesText = `
|
||||||
|
#c-performance-indicator--overlay {
|
||||||
|
background-color:rgba(0,0,0,0.5);
|
||||||
|
position: absolute;
|
||||||
|
width: 300px;
|
||||||
|
left: calc(50% - 300px);
|
||||||
|
}
|
||||||
|
`;
|
||||||
|
const overlayMarkup = `
|
||||||
|
<div id="c-performance-indicator--overlay" title="Performance Overlay">
|
||||||
|
<table id="c-performance-indicator--table">
|
||||||
|
<tr class="c-performance-indicator--row"><td class="c-performance-indicator--measurement-name"></td><td class="c-performance-indicator--measurement-value"></td></tr>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
`;
|
||||||
|
const overlayTemplate = document.createElement('div');
|
||||||
|
overlayTemplate.innerHTML = overlayMarkup;
|
||||||
|
const overlay = overlayTemplate.cloneNode(true);
|
||||||
|
overlay.querySelector('.c-performance-indicator--row').remove();
|
||||||
|
const overlayStyles = document.createElement('style');
|
||||||
|
overlayStyles.appendChild(document.createTextNode(overlayStylesText));
|
||||||
|
|
||||||
|
document.head.appendChild(overlayStyles);
|
||||||
|
document.body.appendChild(overlay);
|
||||||
|
|
||||||
|
indicator.off('click', showOverlay);
|
||||||
|
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
overlay.querySelector('#c-performance-indicator--table').innerHTML = '';
|
||||||
|
|
||||||
|
for (const [name, value] of openmct.performance.measurements.entries()) {
|
||||||
|
const newRow = overlayTemplate
|
||||||
|
.querySelector('.c-performance-indicator--row')
|
||||||
|
.cloneNode(true);
|
||||||
|
newRow.querySelector('.c-performance-indicator--measurement-name').innerText = name;
|
||||||
|
newRow.querySelector('.c-performance-indicator--measurement-value').innerText = value;
|
||||||
|
overlay.querySelector('#c-performance-indicator--table').appendChild(newRow);
|
||||||
|
}
|
||||||
|
}, PERFORMANCE_OVERLAY_RENDER_INTERVAL);
|
||||||
|
|
||||||
|
indicator.on(
|
||||||
|
'click',
|
||||||
|
() => {
|
||||||
|
overlayStyles.remove();
|
||||||
|
overlay.remove();
|
||||||
|
indicator.on('click', showOverlay);
|
||||||
|
clearInterval(interval);
|
||||||
|
},
|
||||||
|
{ once: true, capture: true }
|
||||||
|
);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user