exact values enabled

This commit is contained in:
Scott Bell 2024-08-05 16:26:54 +02:00
parent 69db534042
commit 90a24b380f
3 changed files with 94 additions and 38 deletions

View File

@ -7,6 +7,7 @@ export default class CompsManager extends EventEmitter {
#telemetryObjects = {};
#telemetryCollections = {};
#managerLoadedPromise;
#dataFrame = {};
constructor(openmct, domainObject) {
super();
@ -47,10 +48,9 @@ export default class CompsManager extends EventEmitter {
return underlyingTelemetry;
}
#telemetryProcessor = (telemetryObjects) => {
// new data!
console.debug(`🎉 new data!`, telemetryObjects);
this.emit('underlyingTelemetryUpdated', telemetryObjects);
#telemetryProcessor = (newTelemetry, keyString) => {
console.debug(`🎉 new data for ${keyString}!`, newTelemetry);
this.emit('underlyingTelemetryUpdated', { [keyString]: newTelemetry });
};
#clearData() {
@ -68,7 +68,9 @@ export default class CompsManager extends EventEmitter {
this.#telemetryCollections[keyString] =
this.#openmct.telemetry.requestCollection(telemetryObject);
this.#telemetryCollections[keyString].on('add', this.#telemetryProcessor);
this.#telemetryCollections[keyString].on('add', (data) => {
this.#telemetryProcessor(data, keyString);
});
this.#telemetryCollections[keyString].on('clear', this.#clearData);
await this.#telemetryCollections[keyString].load();
};

View File

@ -6,41 +6,89 @@ onconnect = function (e) {
port.onmessage = function (event) {
console.debug('🧮 Comps Math Worker message:', event);
const { type, callbackID, telemetryForComps, expression } = event.data;
if (type === 'calculateRequest' || type === 'calculateSubscription') {
try {
// the reply type is different for request and subscription
const replyType =
type === 'calculateRequest'
? 'calculationRequestResult'
: 'calculationSubscriptionResult';
const result = calculate(telemetryForComps, expression);
port.postMessage({ type: replyType, callbackID, result });
} catch (error) {
port.postMessage({ type: 'error', callbackID, error: error.message });
try {
const { type, callbackID, telemetryForComps, newTelemetry, expression } = event.data;
if (type === 'calculateRequest') {
const result = calculateRequest(telemetryForComps, expression);
port.postMessage({ type: 'calculationRequestResult', callbackID, result });
} else if (type === 'calculateSubscription') {
const result = calculateSubscription(telemetryForComps, newTelemetry, expression);
if (result.length) {
port.postMessage({ type: 'calculationSubscriptionResult', callbackID, result });
}
} else if (type === 'init') {
port.postMessage({ type: 'ready' });
} else {
throw new Error('Invalid message type');
}
} else if (type === 'init') {
port.postMessage({ type: 'ready' });
} else {
port.postMessage({ type: 'error', callbackID, error: 'Invalid message type' });
} catch (error) {
port.postMessage({ type: 'error', error });
}
};
};
function calculate(telemetryForComps, expression) {
const dataSet1 = Object.values(telemetryForComps)[0];
const dataSet2 = Object.values(telemetryForComps)[1];
function getFullDataFrame(telemetryForComps) {
const dataFrame = {};
Object.keys(telemetryForComps).forEach((key) => {
const dataSet = telemetryForComps[key];
const telemetryMap = new Map(dataSet.map((item) => [item.utc, item]));
dataFrame[key] = telemetryMap;
});
return dataFrame;
}
// Organize data by utc for quick access
const utcMap1 = new Map(dataSet1.map((item) => [item.utc, item]));
const utcMap2 = new Map(dataSet2.map((item) => [item.utc, item]));
function getReducedDataFrame(telemetryForComps, newTelemetry) {
const reducedDataFrame = {};
const fullDataFrame = getFullDataFrame(telemetryForComps);
// we can assume (due to telemetryCollections) that newTelmetry has at most one key
const newTelemetryKey = Object.keys(newTelemetry)[0];
const newTelmetryData = newTelemetry[newTelemetryKey];
// initalize maps for other telemetry
Object.keys(telemetryForComps).forEach((key) => {
if (key !== newTelemetryKey) {
reducedDataFrame[key] = new Map();
}
});
reducedDataFrame[newTelemetryKey] = new Map(
Object.values(newTelmetryData).map((item) => {
return [item.utc, item];
})
);
// march through the new telemetry and look for corresponding telemetry in the other dataset
newTelmetryData.forEach((value) => {
const newTelemetryUtc = value.utc;
Object.keys(telemetryForComps).forEach((otherKey) => {
if (otherKey !== newTelemetryKey) {
const otherDataSet = fullDataFrame[otherKey];
if (otherDataSet.has(newTelemetryUtc)) {
reducedDataFrame[otherKey].set(newTelemetryUtc, otherDataSet.get(newTelemetryUtc));
}
}
});
});
return reducedDataFrame;
}
function calculateSubscription(telemetryForComps, newTelemetry, expression) {
const dataFrame = getReducedDataFrame(telemetryForComps, newTelemetry);
return calculate(dataFrame, expression);
}
function calculateRequest(telemetryForComps, expression) {
const dataFrame = getFullDataFrame(telemetryForComps);
return calculate(dataFrame, expression);
}
function calculate(dataFrame, expression) {
const sumResults = [];
// Iterate over the first dataset and check for matching utc in the other dataset
const firstDataSet = Object.values(dataFrame)[0];
const secondDataSet = Object.values(dataFrame)[1];
// Iterate over the first dataset and check for matching utc in the second dataset
for (const [utc, item1] of utcMap1.entries()) {
if (utcMap2.has(utc)) {
const item2 = utcMap2.get(utc);
for (const [utc, item1] of firstDataSet.entries()) {
if (secondDataSet.has(utc)) {
const item2 = secondDataSet.get(utc);
const output = evaluate(expression, { a: item1.sin, b: item2.sin });
sumResults.push({ utc, output });
}

View File

@ -76,6 +76,18 @@ export default class CompsTelemetryProvider {
});
}
#computeOnNewTelemetry(specificCompsManager, newTelemetry, callbackID) {
const expression = specificCompsManager.getExpression();
const telemetryForComps = specificCompsManager.requestUnderlyingTelemetry();
this.#sharedWorker.port.postMessage({
type: 'calculateSubscription',
telemetryForComps,
newTelemetry,
expression,
callbackID
});
}
subscribe(domainObject, callback) {
const specificCompsManager = CompsManager.getCompsManager(
domainObject,
@ -85,13 +97,7 @@ export default class CompsTelemetryProvider {
const callbackID = this.#getCallbackID();
this.#subscriptionCallbacks[callbackID] = callback;
specificCompsManager.on('underlyingTelemetryUpdated', (newTelemetry) => {
const expression = specificCompsManager.getExpression();
this.#sharedWorker.port.postMessage({
type: 'calculateSubscription',
telemetryForComps: newTelemetry,
expression,
callbackID
});
this.#computeOnNewTelemetry(specificCompsManager, newTelemetry, callbackID);
});
return () => {
specificCompsManager.off('calculationUpdated', callback);