can do derived derived accumulated comps now

This commit is contained in:
Scott Bell 2024-10-09 12:36:20 +02:00
parent 2de628adac
commit ba7f2919c0
2 changed files with 54 additions and 12 deletions

View File

@ -1,5 +1,4 @@
import { EventEmitter } from 'eventemitter3';
import _ from 'lodash';
export default class CompsManager extends EventEmitter {
#openmct;
@ -12,6 +11,8 @@ export default class CompsManager extends EventEmitter {
#loaded = false;
#compositionLoaded = false;
#telemetryProcessors = {};
#loadVersion = 0;
#currentLoadPromise = null;
constructor(openmct, domainObject) {
super();
@ -109,24 +110,61 @@ export default class CompsManager extends EventEmitter {
return this.#loaded;
}
#haveTelemetryOptionsChanged(telemetryOptions) {
return (
telemetryOptions.strategy !== this.#telemetryOptions.strategy ||
telemetryOptions.size !== this.#telemetryOptions.size ||
telemetryOptions.end !== this.#telemetryOptions.end ||
telemetryOptions.start !== this.#telemetryOptions.start ||
telemetryOptions.duration !== this.#telemetryOptions.duration ||
telemetryOptions.filters !== this.#telemetryOptions.filters ||
telemetryOptions.domain !== this.#telemetryOptions.domain
);
}
async load(telemetryOptions) {
if (!_.isEqual(telemetryOptions, this.#telemetryOptions)) {
// Increment the load version to mark a new load operation
const loadVersion = ++this.#loadVersion;
if (this.#haveTelemetryOptionsChanged(telemetryOptions)) {
console.debug(
`😩 Reloading comps manager ${this.#domainObject.name} due to telemetry options change.`,
telemetryOptions
);
this.#destroy();
}
this.#telemetryOptions = telemetryOptions;
if (!this.#compositionLoaded) {
await this.#loadComposition();
this.#compositionLoaded = true;
}
if (!this.#loaded) {
await this.#startListeningToUnderlyingTelemetry();
this.#telemetryLoadedPromises = [];
this.#loaded = true;
}
// Start the load process and store the promise
this.#currentLoadPromise = (async () => {
// Load composition if not already loaded
if (!this.#compositionLoaded) {
await this.#loadComposition();
// Check if a newer load has been initiated
if (loadVersion !== this.#loadVersion) {
return;
}
this.#compositionLoaded = true;
}
// Start listening to telemetry if not already done
if (!this.#loaded) {
await this.#startListeningToUnderlyingTelemetry();
// Check again for newer load
if (loadVersion !== this.#loadVersion) {
return;
}
this.#loaded = true;
console.debug(
`✅ Comps manager ${this.#domainObject.name} is ready.`,
this.#telemetryCollections
);
}
})();
// Await the load process
await this.#currentLoadPromise;
}
async #startListeningToUnderlyingTelemetry() {

View File

@ -84,7 +84,11 @@ function calculate(dataFrame, parameters, expression) {
accumulatedData[referenceParameter.name].push(referenceValue);
referenceValue = accumulatedData[referenceParameter.name];
}
if (referenceParameter.sampleSize && referenceParameter.sampleSize > 0) {
if (
referenceParameter.accumulateValues &&
referenceParameter.sampleSize &&
referenceParameter.sampleSize > 0
) {
// enforce sample size by ensuring referenceValue has the latest n elements
// if we don't have at least the sample size, skip this iteration
if (!referenceValue.length || referenceValue.length < referenceParameter.sampleSize) {