diff --git a/src/plugins/comps/CompsManager.js b/src/plugins/comps/CompsManager.js index 045b6159fc..3115ae334f 100644 --- a/src/plugins/comps/CompsManager.js +++ b/src/plugins/comps/CompsManager.js @@ -7,6 +7,7 @@ export default class CompsManager extends EventEmitter { #telemetryObjects = {}; #telemetryCollections = {}; #managerLoadedPromise; + #dataFrame = {}; constructor(openmct, domainObject) { super(); @@ -47,10 +48,9 @@ export default class CompsManager extends EventEmitter { return underlyingTelemetry; } - #telemetryProcessor = (telemetryObjects) => { - // new data! - console.debug(`🎉 new data!`, telemetryObjects); - this.emit('underlyingTelemetryUpdated', telemetryObjects); + #telemetryProcessor = (newTelemetry, keyString) => { + console.debug(`🎉 new data for ${keyString}!`, newTelemetry); + this.emit('underlyingTelemetryUpdated', { [keyString]: newTelemetry }); }; #clearData() { @@ -68,7 +68,9 @@ export default class CompsManager extends EventEmitter { this.#telemetryCollections[keyString] = this.#openmct.telemetry.requestCollection(telemetryObject); - this.#telemetryCollections[keyString].on('add', this.#telemetryProcessor); + this.#telemetryCollections[keyString].on('add', (data) => { + this.#telemetryProcessor(data, keyString); + }); this.#telemetryCollections[keyString].on('clear', this.#clearData); await this.#telemetryCollections[keyString].load(); }; diff --git a/src/plugins/comps/CompsMathWorker.js b/src/plugins/comps/CompsMathWorker.js index 66aaf1cdae..5692538889 100644 --- a/src/plugins/comps/CompsMathWorker.js +++ b/src/plugins/comps/CompsMathWorker.js @@ -6,41 +6,89 @@ onconnect = function (e) { port.onmessage = function (event) { console.debug('🧮 Comps Math Worker message:', event); - const { type, callbackID, telemetryForComps, expression } = event.data; - if (type === 'calculateRequest' || type === 'calculateSubscription') { - try { - // the reply type is different for request and subscription - const replyType = - type === 'calculateRequest' - ? 'calculationRequestResult' - : 'calculationSubscriptionResult'; - const result = calculate(telemetryForComps, expression); - port.postMessage({ type: replyType, callbackID, result }); - } catch (error) { - port.postMessage({ type: 'error', callbackID, error: error.message }); + try { + const { type, callbackID, telemetryForComps, newTelemetry, expression } = event.data; + if (type === 'calculateRequest') { + const result = calculateRequest(telemetryForComps, expression); + port.postMessage({ type: 'calculationRequestResult', callbackID, result }); + } else if (type === 'calculateSubscription') { + const result = calculateSubscription(telemetryForComps, newTelemetry, expression); + if (result.length) { + port.postMessage({ type: 'calculationSubscriptionResult', callbackID, result }); + } + } else if (type === 'init') { + port.postMessage({ type: 'ready' }); + } else { + throw new Error('Invalid message type'); } - } else if (type === 'init') { - port.postMessage({ type: 'ready' }); - } else { - port.postMessage({ type: 'error', callbackID, error: 'Invalid message type' }); + } catch (error) { + port.postMessage({ type: 'error', error }); } }; }; -function calculate(telemetryForComps, expression) { - const dataSet1 = Object.values(telemetryForComps)[0]; - const dataSet2 = Object.values(telemetryForComps)[1]; +function getFullDataFrame(telemetryForComps) { + const dataFrame = {}; + Object.keys(telemetryForComps).forEach((key) => { + const dataSet = telemetryForComps[key]; + const telemetryMap = new Map(dataSet.map((item) => [item.utc, item])); + dataFrame[key] = telemetryMap; + }); + return dataFrame; +} - // Organize data by utc for quick access - const utcMap1 = new Map(dataSet1.map((item) => [item.utc, item])); - const utcMap2 = new Map(dataSet2.map((item) => [item.utc, item])); +function getReducedDataFrame(telemetryForComps, newTelemetry) { + const reducedDataFrame = {}; + const fullDataFrame = getFullDataFrame(telemetryForComps); + // we can assume (due to telemetryCollections) that newTelmetry has at most one key + const newTelemetryKey = Object.keys(newTelemetry)[0]; + const newTelmetryData = newTelemetry[newTelemetryKey]; + // initalize maps for other telemetry + Object.keys(telemetryForComps).forEach((key) => { + if (key !== newTelemetryKey) { + reducedDataFrame[key] = new Map(); + } + }); + reducedDataFrame[newTelemetryKey] = new Map( + Object.values(newTelmetryData).map((item) => { + return [item.utc, item]; + }) + ); + // march through the new telemetry and look for corresponding telemetry in the other dataset + newTelmetryData.forEach((value) => { + const newTelemetryUtc = value.utc; + Object.keys(telemetryForComps).forEach((otherKey) => { + if (otherKey !== newTelemetryKey) { + const otherDataSet = fullDataFrame[otherKey]; + if (otherDataSet.has(newTelemetryUtc)) { + reducedDataFrame[otherKey].set(newTelemetryUtc, otherDataSet.get(newTelemetryUtc)); + } + } + }); + }); + return reducedDataFrame; +} + +function calculateSubscription(telemetryForComps, newTelemetry, expression) { + const dataFrame = getReducedDataFrame(telemetryForComps, newTelemetry); + return calculate(dataFrame, expression); +} + +function calculateRequest(telemetryForComps, expression) { + const dataFrame = getFullDataFrame(telemetryForComps); + return calculate(dataFrame, expression); +} + +function calculate(dataFrame, expression) { const sumResults = []; + // Iterate over the first dataset and check for matching utc in the other dataset + const firstDataSet = Object.values(dataFrame)[0]; + const secondDataSet = Object.values(dataFrame)[1]; - // Iterate over the first dataset and check for matching utc in the second dataset - for (const [utc, item1] of utcMap1.entries()) { - if (utcMap2.has(utc)) { - const item2 = utcMap2.get(utc); + for (const [utc, item1] of firstDataSet.entries()) { + if (secondDataSet.has(utc)) { + const item2 = secondDataSet.get(utc); const output = evaluate(expression, { a: item1.sin, b: item2.sin }); sumResults.push({ utc, output }); } diff --git a/src/plugins/comps/CompsTelemetryProvider.js b/src/plugins/comps/CompsTelemetryProvider.js index 3edb532591..f6750106c4 100644 --- a/src/plugins/comps/CompsTelemetryProvider.js +++ b/src/plugins/comps/CompsTelemetryProvider.js @@ -76,6 +76,18 @@ export default class CompsTelemetryProvider { }); } + #computeOnNewTelemetry(specificCompsManager, newTelemetry, callbackID) { + const expression = specificCompsManager.getExpression(); + const telemetryForComps = specificCompsManager.requestUnderlyingTelemetry(); + this.#sharedWorker.port.postMessage({ + type: 'calculateSubscription', + telemetryForComps, + newTelemetry, + expression, + callbackID + }); + } + subscribe(domainObject, callback) { const specificCompsManager = CompsManager.getCompsManager( domainObject, @@ -85,13 +97,7 @@ export default class CompsTelemetryProvider { const callbackID = this.#getCallbackID(); this.#subscriptionCallbacks[callbackID] = callback; specificCompsManager.on('underlyingTelemetryUpdated', (newTelemetry) => { - const expression = specificCompsManager.getExpression(); - this.#sharedWorker.port.postMessage({ - type: 'calculateSubscription', - telemetryForComps: newTelemetry, - expression, - callbackID - }); + this.#computeOnNewTelemetry(specificCompsManager, newTelemetry, callbackID); }); return () => { specificCompsManager.off('calculationUpdated', callback);