subscriptions work

This commit is contained in:
Scott Bell 2024-08-06 15:50:24 +02:00
parent 90a24b380f
commit 37d222fd87
4 changed files with 111 additions and 40 deletions

View File

@ -31,6 +31,42 @@ export default class CompsManager extends EventEmitter {
}
}
getFullDataFrame(newTelemetry) {
const dataFrame = {};
// can assume on data item
const newTelemetryKey = Object.keys(newTelemetry)[0];
const newTelemetryData = newTelemetry[newTelemetryKey];
const otherTelemetryKeys = Object.keys(this.#telemetryCollections).filter(
(keyString) => keyString !== newTelemetryKey
);
// initialize the data frame with the new telemetry data
dataFrame[newTelemetryKey] = newTelemetryData;
// initialize the other telemetry data
otherTelemetryKeys.forEach((keyString) => {
dataFrame[keyString] = [];
});
// march through the new telemetry data and add data to the frame from the other telemetry objects
// using LOCF
newTelemetryData.forEach((newDatum) => {
otherTelemetryKeys.forEach((otherKeyString) => {
const otherCollection = this.#telemetryCollections[otherKeyString];
let insertionPointForNewData = otherCollection._sortedIndex(newDatum);
const otherCollectionData = otherCollection.getAll();
if (insertionPointForNewData && insertionPointForNewData >= otherCollectionData.length) {
insertionPointForNewData = otherCollectionData.length - 1;
}
// get the closest datum to the new datum
const closestDatum = otherCollectionData[insertionPointForNewData];
if (closestDatum) {
dataFrame[otherKeyString].push(closestDatum);
}
});
});
return dataFrame;
}
#removeTelemetryObject = (telemetryObject) => {
console.debug('❌ CompsManager: removeTelemetryObject', telemetryObject);
const keyString = this.#openmct.objects.makeKeyString(telemetryObject.identifier);

View File

@ -7,12 +7,12 @@ onconnect = function (e) {
port.onmessage = function (event) {
console.debug('🧮 Comps Math Worker message:', event);
try {
const { type, callbackID, telemetryForComps, newTelemetry, expression } = event.data;
const { type, callbackID, telemetryForComps, expression } = event.data;
if (type === 'calculateRequest') {
const result = calculateRequest(telemetryForComps, expression);
port.postMessage({ type: 'calculationRequestResult', callbackID, result });
} else if (type === 'calculateSubscription') {
const result = calculateSubscription(telemetryForComps, newTelemetry, expression);
const result = calculateSubscription(telemetryForComps, expression);
if (result.length) {
port.postMessage({ type: 'calculationSubscriptionResult', callbackID, result });
}
@ -37,41 +37,8 @@ function getFullDataFrame(telemetryForComps) {
return dataFrame;
}
function getReducedDataFrame(telemetryForComps, newTelemetry) {
const reducedDataFrame = {};
const fullDataFrame = getFullDataFrame(telemetryForComps);
// we can assume (due to telemetryCollections) that newTelmetry has at most one key
const newTelemetryKey = Object.keys(newTelemetry)[0];
const newTelmetryData = newTelemetry[newTelemetryKey];
// initalize maps for other telemetry
Object.keys(telemetryForComps).forEach((key) => {
if (key !== newTelemetryKey) {
reducedDataFrame[key] = new Map();
}
});
reducedDataFrame[newTelemetryKey] = new Map(
Object.values(newTelmetryData).map((item) => {
return [item.utc, item];
})
);
// march through the new telemetry and look for corresponding telemetry in the other dataset
newTelmetryData.forEach((value) => {
const newTelemetryUtc = value.utc;
Object.keys(telemetryForComps).forEach((otherKey) => {
if (otherKey !== newTelemetryKey) {
const otherDataSet = fullDataFrame[otherKey];
if (otherDataSet.has(newTelemetryUtc)) {
reducedDataFrame[otherKey].set(newTelemetryUtc, otherDataSet.get(newTelemetryUtc));
}
}
});
});
return reducedDataFrame;
}
function calculateSubscription(telemetryForComps, newTelemetry, expression) {
const dataFrame = getReducedDataFrame(telemetryForComps, newTelemetry);
function calculateSubscription(telemetryForComps, expression) {
const dataFrame = getFullDataFrame(telemetryForComps);
return calculate(dataFrame, expression);
}

View File

@ -0,0 +1,68 @@
/*****************************************************************************
* Open MCT, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
export default class CompsMetadataProvider {
constructor(openmct) {
this.openmct = openmct;
}
supportsMetadata(domainObject) {
return domainObject.type === 'comps';
}
getDomains(domainObject) {
return this.openmct.time.getAllTimeSystems().map(function (ts, i) {
return {
key: ts.key,
name: ts.name,
format: ts.timeFormat,
hints: {
domain: i
}
};
});
}
getMetadata(domainObject) {
return {
values: this.getDomains().concat([
{
key: 'output',
source: 'output',
name: 'Output',
formatString: '%0.2f',
hints: {
range: 1
}
},
{
key: 'utc',
name: 'Time',
format: 'utc',
hints: {
domain: 1
}
}
])
};
}
}

View File

@ -78,11 +78,11 @@ export default class CompsTelemetryProvider {
#computeOnNewTelemetry(specificCompsManager, newTelemetry, callbackID) {
const expression = specificCompsManager.getExpression();
const telemetryForComps = specificCompsManager.requestUnderlyingTelemetry();
const telemetryForComps = specificCompsManager.getFullDataFrame(newTelemetry);
console.debug('🏟️ created new Data frame:', telemetryForComps);
this.#sharedWorker.port.postMessage({
type: 'calculateSubscription',
telemetryForComps,
newTelemetry,
expression,
callbackID
});
@ -127,7 +127,7 @@ export default class CompsTelemetryProvider {
onSharedWorkerMessage(event) {
console.log('📝 Shared worker message:', event.data);
const { type, result, callbackID } = event.data;
if (type === 'calculationSubscriptionResult') {
if (type === 'calculationSubscriptionResult' && this.#subscriptionCallbacks[callbackID]) {
this.#subscriptionCallbacks[callbackID](result);
} else if (type === 'calculationRequestResult') {
this.#requestPromises[callbackID].resolve(result);