diff --git a/src/plugins/comps/CompsManager.js b/src/plugins/comps/CompsManager.js index 1c011756d4..e4b7a480f3 100644 --- a/src/plugins/comps/CompsManager.js +++ b/src/plugins/comps/CompsManager.js @@ -3,46 +3,79 @@ import { EventEmitter } from 'eventemitter3'; export default class CompsManager extends EventEmitter { #openmct; #domainObject; + #composition; #telemetryObjects = {}; #telemetryCollections = {}; + #managerLoadedPromise; constructor(openmct, domainObject) { super(); this.#openmct = openmct; this.#domainObject = domainObject; + + // Load the composition + this.#managerLoadedPromise = this.#loadComposition(); } - #removeTelemetryObject(telemetryObject) { + load() { + return this.#managerLoadedPromise; + } + + async #loadComposition() { + this.#composition = this.#openmct.composition.get(this.#domainObject); + if (this.#composition) { + this.#composition.on('add', this.#addTelemetryObject); + this.#composition.on('remove', this.#removeTelemetryObject); + await this.#composition.load(); + } + } + + #removeTelemetryObject = (telemetryObject) => { + console.debug('❌ CompsManager: removeTelemetryObject', telemetryObject); const keyString = this.#openmct.objects.makeKeyString(telemetryObject.identifier); delete this.#telemetryObjects[keyString]; this.#telemetryCollections[keyString]?.destroy(); delete this.#telemetryCollections[keyString]; + }; + + requestUnderlyingTelemetry() { + const underlyingTelemetry = {}; + Object.keys(this.#telemetryCollections).forEach((collectionKey) => { + const collection = this.#telemetryCollections[collectionKey]; + underlyingTelemetry[collectionKey] = collection.getAll(); + }); + return underlyingTelemetry; } - telemetryProcessor(telemetryObjects) { - console.debug('Telemetry Processor', telemetryObjects); + #telemetryProcessor(telemetryObjects) { + this.emit('underlyingTelemetryUpdated', telemetryObjects); } - clearData() { + #clearData() { console.debug('Clear Data'); } - #addTelemetryObject(telemetryObject) { + getExpression() { + return 'a + b'; + } + + #addTelemetryObject = (telemetryObject) => { + console.debug('📢 CompsManager: #addTelemetryObject', telemetryObject); const keyString = this.#openmct.objects.makeKeyString(telemetryObject.identifier); this.#telemetryObjects[keyString] = telemetryObject; this.#telemetryCollections[keyString] = this.#openmct.telemetry.requestCollection(telemetryObject); - this.#telemetryCollections[keyString].on('add', this.telemetryProcessor); - this.#telemetryCollections[keyString].on('clear', this.clearData); + this.#telemetryCollections[keyString].on('add', this.#telemetryProcessor); + this.#telemetryCollections[keyString].on('clear', this.#clearData); this.#telemetryCollections[keyString].load(); - } + }; static getCompsManager(domainObject, openmct, compsManagerPool) { const id = openmct.objects.makeKeyString(domainObject.identifier); if (!compsManagerPool[id]) { - compsManagerPool[id] = new CompsManager(domainObject, this.openmct); + compsManagerPool[id] = new CompsManager(openmct, domainObject); } return compsManagerPool[id]; diff --git a/src/plugins/comps/CompsMathWorker.js b/src/plugins/comps/CompsMathWorker.js index 1df64c0470..8f2f886773 100644 --- a/src/plugins/comps/CompsMathWorker.js +++ b/src/plugins/comps/CompsMathWorker.js @@ -6,21 +6,26 @@ onconnect = function (e) { port.onmessage = function (event) { console.debug('🧮 Comps Math Worker message:', event); - const { type, id, data, expression } = event.data; - if (type === 'calculate') { + const { type, callbackID, telemetryForComps, expression } = event.data; + if (type === 'calculateRequest' || type === 'calculateSubscription') { try { - const result = data.map((point) => { + // the reply type is different for request and subscription + const replyType = + type === 'calculateRequest' + ? 'calculationRequestResult' + : 'calculationSubscriptionResult'; + const result = telemetryForComps.map((point) => { // Using Math.js to evaluate the expression against the data return { ...point, value: evaluate(expression, point) }; }); - port.postMessage({ type: 'response', id, result }); + port.postMessage({ type: replyType, callbackID, result }); } catch (error) { - port.postMessage({ type: 'error', id, error: error.message }); + port.postMessage({ type: 'error', callbackID, error: error.message }); } } else if (type === 'init') { port.postMessage({ type: 'ready' }); } else { - port.postMessage({ type: 'error', id, error: 'Invalid message type' }); + port.postMessage({ type: 'error', callbackID, error: 'Invalid message type' }); } }; }; diff --git a/src/plugins/comps/CompsTelemetryProvider.js b/src/plugins/comps/CompsTelemetryProvider.js index 3fb24ac536..10fbbacdd0 100644 --- a/src/plugins/comps/CompsTelemetryProvider.js +++ b/src/plugins/comps/CompsTelemetryProvider.js @@ -19,23 +19,20 @@ * this source code distribution or the Licensing information page available * at runtime from the About dialog for additional information. *****************************************************************************/ +import CompsManager from './CompsManager.js'; export default class CompsTelemetryProvider { - #telemetryObjects = {}; - #telemetryCollections = {}; - #composition = []; #openmct = null; #sharedWorker = null; + #compsManagerPool = null; + #lastUniqueID = 1; + #requestPromises = {}; + #subscriptionCallbacks = {}; - constructor(openmct) { + constructor(openmct, compsManagerPool) { this.#openmct = openmct; - } - - #removeTelemetryObject(telemetryObject) { - const keyString = this.openmct.objects.makeKeyString(telemetryObject.identifier); - delete this.#telemetryObjects[keyString]; - this.#telemetryCollections[keyString]?.destroy(); - delete this.#telemetryCollections[keyString]; + this.#compsManagerPool = compsManagerPool; + this.#startSharedWorker(); } isTelemetryObject(domainObject) { @@ -50,26 +47,55 @@ export default class CompsTelemetryProvider { return domainObject.type === 'comps'; } + #getCallbackID() { + return this.#lastUniqueID++; + } + // eslint-disable-next-line require-await async request(domainObject, options) { - // get the telemetry from the collections - const telmetryToSend = {}; - Object.keys(this.#telemetryCollections).forEach((keyString) => { - telmetryToSend[keyString] = this.#telemetryCollections[keyString].getAll( - domainObject, - options - ); - }); - this.#sharedWorker.port.postMessage({ - type: 'calculate', - data: telmetryToSend, - expression: 'a + b' + const specificCompsManager = CompsManager.getCompsManager( + domainObject, + this.#openmct, + this.#compsManagerPool + ); + await specificCompsManager.load(); + const telemetryForComps = specificCompsManager.requestUnderlyingTelemetry(); + console.debug('🏟️ Telemetry for comps:', telemetryForComps); + const expression = specificCompsManager.getExpression(); + // need to create callbackID with a promise for future execution + return new Promise((resolve, reject) => { + const callbackID = this.#getCallbackID(); + this.#requestPromises[callbackID] = { resolve, reject }; + this.#sharedWorker.port.postMessage({ + type: 'calculateRequest', + telemetryForComps, + expression, + callbackID + }); }); } subscribe(domainObject, callback) { - // TODO: add to listener list and return a function to remove it - return () => {}; + const specificCompsManager = CompsManager.getCompsManager( + domainObject, + this.#openmct, + this.#compsManagerPool + ); + const callbackID = this.#getCallbackID(); + this.#subscriptionCallbacks[callbackID] = callback; + specificCompsManager.on('underlyingTelemetryUpdated', (newTelemetry) => { + const expression = specificCompsManager.getExpression(); + this.#sharedWorker.port.postMessage({ + type: 'calculateSubscription', + telemetryForComps: newTelemetry, + expression, + callbackID + }); + }); + return () => { + specificCompsManager.off('calculationUpdated', callback); + delete this.#subscriptionCallbacks[callbackID]; + }; } #startSharedWorker() { @@ -86,13 +112,6 @@ export default class CompsTelemetryProvider { // send an initial message to the worker this.#sharedWorker.port.postMessage({ type: 'init' }); - // for testing, try a message adding two numbers - this.#sharedWorker.port.postMessage({ - type: 'calculate', - data: [{ a: 1, b: 2 }], - expression: 'a + b' - }); - this.#openmct.on('destroy', () => { this.#sharedWorker.port.close(); }); @@ -100,6 +119,15 @@ export default class CompsTelemetryProvider { onSharedWorkerMessage(event) { console.log('📝 Shared worker message:', event.data); + const { type, result, callbackID } = event.data; + if (type === 'calculationSubscriptionResult') { + this.#subscriptionCallbacks[callbackID](result); + } else if (type === 'calculationRequestResult') { + this.#requestPromises[callbackID].resolve(result); + delete this.#requestPromises[callbackID]; + } else if (type === 'error') { + console.error('❌ Shared worker error:', event.data); + } } onSharedWorkerMessageError(event) { diff --git a/src/plugins/comps/components/CompsView.vue b/src/plugins/comps/components/CompsView.vue index a95bf7086b..3118fc1c27 100644 --- a/src/plugins/comps/components/CompsView.vue +++ b/src/plugins/comps/components/CompsView.vue @@ -33,33 +33,10 @@ const openmct = inject('openmct'); const domainObject = inject('domainObject'); const compsManagerPool = inject('compsManagerPool'); const compsManager = CompsManager.getCompsManager(domainObject, openmct, compsManagerPool); -const composition = openmct.composition.get(domainObject); onMounted(() => { - loadComposition(); console.debug('🚀 CompsView: onMounted with compsManager', compsManager); }); -async function loadComposition() { - if (composition) { - composition.on('add', addTelemetryObject); - composition.on('remove', removeTelemetryObject); - await composition.load(); - } -} - -function addTelemetryObject(object) { - console.debug('📢 CompsView: addTelemetryObject', object); -} - -function removeTelemetryObject(object) { - console.debug('❌ CompsView: removeTelemetryObject', object); -} - -onBeforeUnmount(() => { - if (composition) { - composition.off('add', addTelemetryObject); - composition.off('remove', removeTelemetryObject); - } -}); +onBeforeUnmount(() => {}); diff --git a/src/plugins/comps/plugin.js b/src/plugins/comps/plugin.js index a3bf5590a4..a1119132a0 100644 --- a/src/plugins/comps/plugin.js +++ b/src/plugins/comps/plugin.js @@ -40,7 +40,10 @@ export default function CompsPlugin() { } }); openmct.composition.addPolicy((parent, child) => { - return openmct.telemetry.isTelemetryObject(child); + if (parent.type === 'comps' && !openmct.telemetry.isTelemetryObject(child)) { + return false; + } + return true; }); openmct.telemetry.addProvider(new CompsTelemetryProvider(openmct, compsManagerPool)); openmct.objectViews.addProvider(new CompsViewProvider(openmct, compsManagerPool));