mirror of
https://github.com/nasa/openmct.git
synced 2025-06-16 06:08:11 +00:00
Request batch when idle (#7526)
See https://github.com/nasa/openmct/pull/7526 for details
This commit is contained in:
@ -20,7 +20,6 @@
|
||||
* at runtime from the About dialog for additional information.
|
||||
*****************************************************************************/
|
||||
import installWorker from './WebSocketWorker.js';
|
||||
const DEFAULT_RATE_MS = 1000;
|
||||
/**
|
||||
* Describes the strategy to be used when batching WebSocket messages
|
||||
*
|
||||
@ -51,11 +50,21 @@ const DEFAULT_RATE_MS = 1000;
|
||||
*
|
||||
* @memberof module:openmct.telemetry
|
||||
*/
|
||||
// Shim for Internet Explorer, I mean Safari. It doesn't support requestIdleCallback, but it's in a tech preview, so it will be dropping soon.
|
||||
const requestIdleCallback =
|
||||
// eslint-disable-next-line compat/compat
|
||||
window.requestIdleCallback ?? ((fn, { timeout }) => setTimeout(fn, timeout));
|
||||
const ONE_SECOND = 1000;
|
||||
const FIVE_SECONDS = 5 * ONE_SECOND;
|
||||
|
||||
class BatchingWebSocket extends EventTarget {
|
||||
#worker;
|
||||
#openmct;
|
||||
#showingRateLimitNotification;
|
||||
#rate;
|
||||
#maxBatchSize;
|
||||
#applicationIsInitializing;
|
||||
#maxBatchWait;
|
||||
#firstBatchReceived;
|
||||
|
||||
constructor(openmct) {
|
||||
super();
|
||||
@ -66,7 +75,10 @@ class BatchingWebSocket extends EventTarget {
|
||||
this.#worker = new Worker(workerUrl);
|
||||
this.#openmct = openmct;
|
||||
this.#showingRateLimitNotification = false;
|
||||
this.#rate = DEFAULT_RATE_MS;
|
||||
this.#maxBatchSize = Number.POSITIVE_INFINITY;
|
||||
this.#maxBatchWait = ONE_SECOND;
|
||||
this.#applicationIsInitializing = true;
|
||||
this.#firstBatchReceived = false;
|
||||
|
||||
const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
|
||||
this.#worker.addEventListener('message', routeMessageToHandler);
|
||||
@ -78,6 +90,20 @@ class BatchingWebSocket extends EventTarget {
|
||||
},
|
||||
{ once: true }
|
||||
);
|
||||
|
||||
openmct.once('start', () => {
|
||||
// An idle callback is a pretty good indication that a complex display is done loading. At that point set the batch size more conservatively.
|
||||
// Force it after 5 seconds if it hasn't happened yet.
|
||||
requestIdleCallback(
|
||||
() => {
|
||||
this.#applicationIsInitializing = false;
|
||||
this.setMaxBatchSize(this.#maxBatchSize);
|
||||
},
|
||||
{
|
||||
timeout: FIVE_SECONDS
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -129,14 +155,6 @@ class BatchingWebSocket extends EventTarget {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* When using batching, sets the rate at which batches of messages are released.
|
||||
* @param {Number} rate the amount of time to wait, in ms, between batches.
|
||||
*/
|
||||
setRate(rate) {
|
||||
this.#rate = rate;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Number} maxBatchSize the maximum length of a batch of messages. For example,
|
||||
* the maximum number of telemetry values to batch before dropping them
|
||||
@ -151,12 +169,29 @@ class BatchingWebSocket extends EventTarget {
|
||||
* 15 would probably be a better batch size.
|
||||
*/
|
||||
setMaxBatchSize(maxBatchSize) {
|
||||
this.#maxBatchSize = maxBatchSize;
|
||||
if (!this.#applicationIsInitializing) {
|
||||
this.#sendMaxBatchSizeToWorker(this.#maxBatchSize);
|
||||
}
|
||||
}
|
||||
setMaxBatchWait(wait) {
|
||||
this.#maxBatchWait = wait;
|
||||
this.#sendBatchWaitToWorker(this.#maxBatchWait);
|
||||
}
|
||||
#sendMaxBatchSizeToWorker(maxBatchSize) {
|
||||
this.#worker.postMessage({
|
||||
type: 'setMaxBatchSize',
|
||||
maxBatchSize
|
||||
});
|
||||
}
|
||||
|
||||
#sendBatchWaitToWorker(maxBatchWait) {
|
||||
this.#worker.postMessage({
|
||||
type: 'setMaxBatchWait',
|
||||
maxBatchWait
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect the associated WebSocket. Generally speaking there is no need to call
|
||||
* this manually.
|
||||
@ -169,7 +204,9 @@ class BatchingWebSocket extends EventTarget {
|
||||
|
||||
#routeMessageToHandler(message) {
|
||||
if (message.data.type === 'batch') {
|
||||
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
|
||||
this.start = Date.now();
|
||||
const batch = message.data.batch;
|
||||
if (batch.dropped === true && !this.#showingRateLimitNotification) {
|
||||
const notification = this.#openmct.notifications.alert(
|
||||
'Telemetry dropped due to client rate limiting.',
|
||||
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
|
||||
@ -179,16 +216,45 @@ class BatchingWebSocket extends EventTarget {
|
||||
this.#showingRateLimitNotification = false;
|
||||
});
|
||||
}
|
||||
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
|
||||
setTimeout(() => {
|
||||
this.#readyForNextBatch();
|
||||
}, this.#rate);
|
||||
|
||||
this.dispatchEvent(new CustomEvent('batch', { detail: batch }));
|
||||
this.#waitUntilIdleAndRequestNextBatch(batch);
|
||||
} else if (message.data.type === 'message') {
|
||||
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));
|
||||
} else if (message.data.type === 'reconnected') {
|
||||
this.dispatchEvent(new CustomEvent('reconnected'));
|
||||
} else {
|
||||
throw new Error(`Unknown message type: ${message.data.type}`);
|
||||
}
|
||||
}
|
||||
|
||||
#waitUntilIdleAndRequestNextBatch(batch) {
|
||||
requestIdleCallback(
|
||||
(state) => {
|
||||
if (this.#firstBatchReceived === false) {
|
||||
this.#firstBatchReceived = true;
|
||||
}
|
||||
const now = Date.now();
|
||||
const waitedFor = now - this.start;
|
||||
if (state.didTimeout === true) {
|
||||
if (document.visibilityState === 'visible') {
|
||||
console.warn(`Event loop is too busy to process batch.`);
|
||||
this.#waitUntilIdleAndRequestNextBatch(batch);
|
||||
} else {
|
||||
// After ingesting a telemetry batch, wait until the event loop is idle again before
|
||||
// informing the worker we are ready for another batch.
|
||||
this.#readyForNextBatch();
|
||||
}
|
||||
} else {
|
||||
if (waitedFor > ONE_SECOND) {
|
||||
console.warn(`Warning, batch processing took ${waitedFor}ms`);
|
||||
}
|
||||
this.#readyForNextBatch();
|
||||
}
|
||||
},
|
||||
{ timeout: ONE_SECOND }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export default BatchingWebSocket;
|
||||
|
@ -85,6 +85,7 @@ const SUBSCRIBE_STRATEGY = {
|
||||
export default class TelemetryAPI {
|
||||
#isGreedyLAD;
|
||||
#subscribeCache;
|
||||
#hasReturnedFirstData;
|
||||
|
||||
get SUBSCRIBE_STRATEGY() {
|
||||
return SUBSCRIBE_STRATEGY;
|
||||
@ -108,6 +109,7 @@ export default class TelemetryAPI {
|
||||
this.#isGreedyLAD = true;
|
||||
this.BatchingWebSocket = BatchingWebSocket;
|
||||
this.#subscribeCache = {};
|
||||
this.#hasReturnedFirstData = false;
|
||||
}
|
||||
|
||||
abortAllRequests() {
|
||||
@ -383,7 +385,10 @@ export default class TelemetryAPI {
|
||||
arguments[1] = await this.applyRequestInterceptors(domainObject, arguments[1]);
|
||||
try {
|
||||
const telemetry = await provider.request(...arguments);
|
||||
|
||||
if (!this.#hasReturnedFirstData) {
|
||||
this.#hasReturnedFirstData = true;
|
||||
performance.mark('firstHistoricalDataReturned');
|
||||
}
|
||||
return telemetry;
|
||||
} catch (error) {
|
||||
if (error.name !== 'AbortError') {
|
||||
|
@ -21,6 +21,7 @@
|
||||
*****************************************************************************/
|
||||
/* eslint-disable max-classes-per-file */
|
||||
export default function installWorker() {
|
||||
const ONE_SECOND = 1000;
|
||||
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
|
||||
|
||||
/**
|
||||
@ -44,6 +45,13 @@ export default function installWorker() {
|
||||
#currentWaitIndex = 0;
|
||||
#messageCallbacks = [];
|
||||
#wsUrl;
|
||||
#reconnecting = false;
|
||||
#worker;
|
||||
|
||||
constructor(worker) {
|
||||
super();
|
||||
this.#worker = worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Establish a new WebSocket connection to the given URL
|
||||
@ -62,6 +70,9 @@ export default function installWorker() {
|
||||
this.#isConnecting = true;
|
||||
|
||||
this.#webSocket = new WebSocket(url);
|
||||
//Exposed to e2e tests so that the websocket can be manipulated during tests. Cannot find any other way to do this.
|
||||
// Playwright does not support forcing websocket state changes.
|
||||
this.#worker.currentWebSocket = this.#webSocket;
|
||||
|
||||
const boundConnected = this.#connected.bind(this);
|
||||
this.#webSocket.addEventListener('open', boundConnected);
|
||||
@ -100,12 +111,17 @@ export default function installWorker() {
|
||||
}
|
||||
|
||||
#connected() {
|
||||
console.debug('Websocket connected.');
|
||||
console.info('Websocket connected.');
|
||||
this.#isConnected = true;
|
||||
this.#isConnecting = false;
|
||||
this.#currentWaitIndex = 0;
|
||||
|
||||
this.dispatchEvent(new Event('connected'));
|
||||
if (this.#reconnecting) {
|
||||
this.#worker.postMessage({
|
||||
type: 'reconnected'
|
||||
});
|
||||
this.#reconnecting = false;
|
||||
}
|
||||
|
||||
this.#flushQueue();
|
||||
}
|
||||
@ -138,6 +154,7 @@ export default function installWorker() {
|
||||
if (this.#reconnectTimeoutHandle) {
|
||||
return;
|
||||
}
|
||||
this.#reconnecting = true;
|
||||
|
||||
this.#reconnectTimeoutHandle = setTimeout(() => {
|
||||
this.connect(this.#wsUrl);
|
||||
@ -207,6 +224,9 @@ export default function installWorker() {
|
||||
case 'setMaxBatchSize':
|
||||
this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize);
|
||||
break;
|
||||
case 'setMaxBatchWait':
|
||||
this.#messageBatcher.setMaxBatchWait(message.data.maxBatchWait);
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unknown message type: ${type}`);
|
||||
}
|
||||
@ -245,7 +265,6 @@ export default function installWorker() {
|
||||
}
|
||||
|
||||
routeMessageToHandler(data) {
|
||||
//Implement batching here
|
||||
if (this.#messageBatcher.shouldBatchMessage(data)) {
|
||||
this.#messageBatcher.addMessageToBatch(data);
|
||||
} else {
|
||||
@ -267,12 +286,15 @@ export default function installWorker() {
|
||||
#maxBatchSize;
|
||||
#readyForNextBatch;
|
||||
#worker;
|
||||
#throttledSendNextBatch;
|
||||
|
||||
constructor(worker) {
|
||||
this.#maxBatchSize = 10;
|
||||
// No dropping telemetry unless we're explicitly told to.
|
||||
this.#maxBatchSize = Number.POSITIVE_INFINITY;
|
||||
this.#readyForNextBatch = false;
|
||||
this.#worker = worker;
|
||||
this.#resetBatch();
|
||||
this.setMaxBatchWait(ONE_SECOND);
|
||||
}
|
||||
#resetBatch() {
|
||||
this.#batch = {};
|
||||
@ -310,23 +332,29 @@ export default function installWorker() {
|
||||
const batchId = this.#batchingStrategy.getBatchIdFromMessage(message);
|
||||
let batch = this.#batch[batchId];
|
||||
if (batch === undefined) {
|
||||
this.#hasBatch = true;
|
||||
batch = this.#batch[batchId] = [message];
|
||||
} else {
|
||||
batch.push(message);
|
||||
}
|
||||
if (batch.length > this.#maxBatchSize) {
|
||||
console.warn(
|
||||
`Exceeded max batch size of ${this.#maxBatchSize} for ${batchId}. Dropping value.`
|
||||
);
|
||||
batch.shift();
|
||||
this.#batch.dropped = this.#batch.dropped || true;
|
||||
this.#batch.dropped = true;
|
||||
}
|
||||
|
||||
if (this.#readyForNextBatch) {
|
||||
this.#sendNextBatch();
|
||||
} else {
|
||||
this.#hasBatch = true;
|
||||
this.#throttledSendNextBatch();
|
||||
}
|
||||
}
|
||||
setMaxBatchSize(maxBatchSize) {
|
||||
this.#maxBatchSize = maxBatchSize;
|
||||
}
|
||||
setMaxBatchWait(maxBatchWait) {
|
||||
this.#throttledSendNextBatch = throttle(this.#sendNextBatch.bind(this), maxBatchWait);
|
||||
}
|
||||
/**
|
||||
* Indicates that client code is ready to receive the next batch of
|
||||
* messages. If a batch is available, it will be immediately sent.
|
||||
@ -335,7 +363,7 @@ export default function installWorker() {
|
||||
*/
|
||||
readyForNextBatch() {
|
||||
if (this.#hasBatch) {
|
||||
this.#sendNextBatch();
|
||||
this.#throttledSendNextBatch();
|
||||
} else {
|
||||
this.#readyForNextBatch = true;
|
||||
}
|
||||
@ -352,7 +380,34 @@ export default function installWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
const websocket = new ResilientWebSocket();
|
||||
function throttle(callback, wait) {
|
||||
let last = 0;
|
||||
let throttling = false;
|
||||
|
||||
return function (...args) {
|
||||
if (throttling) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = performance.now();
|
||||
const timeSinceLast = now - last;
|
||||
|
||||
if (timeSinceLast >= wait) {
|
||||
last = now;
|
||||
callback(...args);
|
||||
} else if (!throttling) {
|
||||
throttling = true;
|
||||
|
||||
setTimeout(() => {
|
||||
last = performance.now();
|
||||
throttling = false;
|
||||
callback(...args);
|
||||
}, wait - timeSinceLast);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const websocket = new ResilientWebSocket(self);
|
||||
const messageBatcher = new MessageBatcher(self);
|
||||
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher);
|
||||
const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self);
|
||||
@ -363,4 +418,6 @@ export default function installWorker() {
|
||||
websocket.registerMessageCallback((data) => {
|
||||
websocketBroker.routeMessageToHandler(data);
|
||||
});
|
||||
|
||||
self.websocketInstance = websocket;
|
||||
}
|
||||
|
Reference in New Issue
Block a user