worker ready

This commit is contained in:
Scott Bell 2024-08-02 15:57:15 +02:00
parent 0dd04426f5
commit ff814544c6
5 changed files with 118 additions and 72 deletions

View File

@ -3,46 +3,79 @@ import { EventEmitter } from 'eventemitter3';
export default class CompsManager extends EventEmitter {
#openmct;
#domainObject;
#composition;
#telemetryObjects = {};
#telemetryCollections = {};
#managerLoadedPromise;
constructor(openmct, domainObject) {
super();
this.#openmct = openmct;
this.#domainObject = domainObject;
// Load the composition
this.#managerLoadedPromise = this.#loadComposition();
}
#removeTelemetryObject(telemetryObject) {
load() {
return this.#managerLoadedPromise;
}
async #loadComposition() {
this.#composition = this.#openmct.composition.get(this.#domainObject);
if (this.#composition) {
this.#composition.on('add', this.#addTelemetryObject);
this.#composition.on('remove', this.#removeTelemetryObject);
await this.#composition.load();
}
}
#removeTelemetryObject = (telemetryObject) => {
console.debug('❌ CompsManager: removeTelemetryObject', telemetryObject);
const keyString = this.#openmct.objects.makeKeyString(telemetryObject.identifier);
delete this.#telemetryObjects[keyString];
this.#telemetryCollections[keyString]?.destroy();
delete this.#telemetryCollections[keyString];
};
requestUnderlyingTelemetry() {
const underlyingTelemetry = {};
Object.keys(this.#telemetryCollections).forEach((collectionKey) => {
const collection = this.#telemetryCollections[collectionKey];
underlyingTelemetry[collectionKey] = collection.getAll();
});
return underlyingTelemetry;
}
telemetryProcessor(telemetryObjects) {
console.debug('Telemetry Processor', telemetryObjects);
#telemetryProcessor(telemetryObjects) {
this.emit('underlyingTelemetryUpdated', telemetryObjects);
}
clearData() {
#clearData() {
console.debug('Clear Data');
}
#addTelemetryObject(telemetryObject) {
getExpression() {
return 'a + b';
}
#addTelemetryObject = (telemetryObject) => {
console.debug('📢 CompsManager: #addTelemetryObject', telemetryObject);
const keyString = this.#openmct.objects.makeKeyString(telemetryObject.identifier);
this.#telemetryObjects[keyString] = telemetryObject;
this.#telemetryCollections[keyString] =
this.#openmct.telemetry.requestCollection(telemetryObject);
this.#telemetryCollections[keyString].on('add', this.telemetryProcessor);
this.#telemetryCollections[keyString].on('clear', this.clearData);
this.#telemetryCollections[keyString].on('add', this.#telemetryProcessor);
this.#telemetryCollections[keyString].on('clear', this.#clearData);
this.#telemetryCollections[keyString].load();
}
};
static getCompsManager(domainObject, openmct, compsManagerPool) {
const id = openmct.objects.makeKeyString(domainObject.identifier);
if (!compsManagerPool[id]) {
compsManagerPool[id] = new CompsManager(domainObject, this.openmct);
compsManagerPool[id] = new CompsManager(openmct, domainObject);
}
return compsManagerPool[id];

View File

@ -6,21 +6,26 @@ onconnect = function (e) {
port.onmessage = function (event) {
console.debug('🧮 Comps Math Worker message:', event);
const { type, id, data, expression } = event.data;
if (type === 'calculate') {
const { type, callbackID, telemetryForComps, expression } = event.data;
if (type === 'calculateRequest' || type === 'calculateSubscription') {
try {
const result = data.map((point) => {
// the reply type is different for request and subscription
const replyType =
type === 'calculateRequest'
? 'calculationRequestResult'
: 'calculationSubscriptionResult';
const result = telemetryForComps.map((point) => {
// Using Math.js to evaluate the expression against the data
return { ...point, value: evaluate(expression, point) };
});
port.postMessage({ type: 'response', id, result });
port.postMessage({ type: replyType, callbackID, result });
} catch (error) {
port.postMessage({ type: 'error', id, error: error.message });
port.postMessage({ type: 'error', callbackID, error: error.message });
}
} else if (type === 'init') {
port.postMessage({ type: 'ready' });
} else {
port.postMessage({ type: 'error', id, error: 'Invalid message type' });
port.postMessage({ type: 'error', callbackID, error: 'Invalid message type' });
}
};
};

View File

@ -19,23 +19,20 @@
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import CompsManager from './CompsManager.js';
export default class CompsTelemetryProvider {
#telemetryObjects = {};
#telemetryCollections = {};
#composition = [];
#openmct = null;
#sharedWorker = null;
#compsManagerPool = null;
#lastUniqueID = 1;
#requestPromises = {};
#subscriptionCallbacks = {};
constructor(openmct) {
constructor(openmct, compsManagerPool) {
this.#openmct = openmct;
}
#removeTelemetryObject(telemetryObject) {
const keyString = this.openmct.objects.makeKeyString(telemetryObject.identifier);
delete this.#telemetryObjects[keyString];
this.#telemetryCollections[keyString]?.destroy();
delete this.#telemetryCollections[keyString];
this.#compsManagerPool = compsManagerPool;
this.#startSharedWorker();
}
isTelemetryObject(domainObject) {
@ -50,26 +47,55 @@ export default class CompsTelemetryProvider {
return domainObject.type === 'comps';
}
#getCallbackID() {
return this.#lastUniqueID++;
}
// eslint-disable-next-line require-await
async request(domainObject, options) {
// get the telemetry from the collections
const telmetryToSend = {};
Object.keys(this.#telemetryCollections).forEach((keyString) => {
telmetryToSend[keyString] = this.#telemetryCollections[keyString].getAll(
domainObject,
options
);
});
this.#sharedWorker.port.postMessage({
type: 'calculate',
data: telmetryToSend,
expression: 'a + b'
const specificCompsManager = CompsManager.getCompsManager(
domainObject,
this.#openmct,
this.#compsManagerPool
);
await specificCompsManager.load();
const telemetryForComps = specificCompsManager.requestUnderlyingTelemetry();
console.debug('🏟️ Telemetry for comps:', telemetryForComps);
const expression = specificCompsManager.getExpression();
// need to create callbackID with a promise for future execution
return new Promise((resolve, reject) => {
const callbackID = this.#getCallbackID();
this.#requestPromises[callbackID] = { resolve, reject };
this.#sharedWorker.port.postMessage({
type: 'calculateRequest',
telemetryForComps,
expression,
callbackID
});
});
}
subscribe(domainObject, callback) {
// TODO: add to listener list and return a function to remove it
return () => {};
const specificCompsManager = CompsManager.getCompsManager(
domainObject,
this.#openmct,
this.#compsManagerPool
);
const callbackID = this.#getCallbackID();
this.#subscriptionCallbacks[callbackID] = callback;
specificCompsManager.on('underlyingTelemetryUpdated', (newTelemetry) => {
const expression = specificCompsManager.getExpression();
this.#sharedWorker.port.postMessage({
type: 'calculateSubscription',
telemetryForComps: newTelemetry,
expression,
callbackID
});
});
return () => {
specificCompsManager.off('calculationUpdated', callback);
delete this.#subscriptionCallbacks[callbackID];
};
}
#startSharedWorker() {
@ -86,13 +112,6 @@ export default class CompsTelemetryProvider {
// send an initial message to the worker
this.#sharedWorker.port.postMessage({ type: 'init' });
// for testing, try a message adding two numbers
this.#sharedWorker.port.postMessage({
type: 'calculate',
data: [{ a: 1, b: 2 }],
expression: 'a + b'
});
this.#openmct.on('destroy', () => {
this.#sharedWorker.port.close();
});
@ -100,6 +119,15 @@ export default class CompsTelemetryProvider {
onSharedWorkerMessage(event) {
console.log('📝 Shared worker message:', event.data);
const { type, result, callbackID } = event.data;
if (type === 'calculationSubscriptionResult') {
this.#subscriptionCallbacks[callbackID](result);
} else if (type === 'calculationRequestResult') {
this.#requestPromises[callbackID].resolve(result);
delete this.#requestPromises[callbackID];
} else if (type === 'error') {
console.error('❌ Shared worker error:', event.data);
}
}
onSharedWorkerMessageError(event) {

View File

@ -33,33 +33,10 @@ const openmct = inject('openmct');
const domainObject = inject('domainObject');
const compsManagerPool = inject('compsManagerPool');
const compsManager = CompsManager.getCompsManager(domainObject, openmct, compsManagerPool);
const composition = openmct.composition.get(domainObject);
onMounted(() => {
loadComposition();
console.debug('🚀 CompsView: onMounted with compsManager', compsManager);
});
async function loadComposition() {
if (composition) {
composition.on('add', addTelemetryObject);
composition.on('remove', removeTelemetryObject);
await composition.load();
}
}
function addTelemetryObject(object) {
console.debug('📢 CompsView: addTelemetryObject', object);
}
function removeTelemetryObject(object) {
console.debug('❌ CompsView: removeTelemetryObject', object);
}
onBeforeUnmount(() => {
if (composition) {
composition.off('add', addTelemetryObject);
composition.off('remove', removeTelemetryObject);
}
});
onBeforeUnmount(() => {});
</script>

View File

@ -40,7 +40,10 @@ export default function CompsPlugin() {
}
});
openmct.composition.addPolicy((parent, child) => {
return openmct.telemetry.isTelemetryObject(child);
if (parent.type === 'comps' && !openmct.telemetry.isTelemetryObject(child)) {
return false;
}
return true;
});
openmct.telemetry.addProvider(new CompsTelemetryProvider(openmct, compsManagerPool));
openmct.objectViews.addProvider(new CompsViewProvider(openmct, compsManagerPool));