From b7fffeab1cb9d3c5d5d405e6401d6f6c44c2d20d Mon Sep 17 00:00:00 2001 From: David Tsay Date: Mon, 16 Mar 2020 21:57:42 -0700 Subject: [PATCH] conditionManager controls subscriptions --- src/plugins/condition/Condition.js | 22 ++++++-- src/plugins/condition/ConditionManager.js | 24 +++++++++ .../condition/criterion/TelemetryCriterion.js | 54 ++++++------------- 3 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/plugins/condition/Condition.js b/src/plugins/condition/Condition.js index b42c4fc4ac..6c5ebfa92e 100644 --- a/src/plugins/condition/Condition.js +++ b/src/plugins/condition/Condition.js @@ -61,6 +61,21 @@ export default class ConditionClass extends EventEmitter { this.createCriteria(conditionConfiguration.configuration.criteria); } this.trigger = conditionConfiguration.configuration.trigger; + this.unsubscribes = this.getTelemetrySubscriptions().map(id => { + this.on(`subscription:${id}`, this.handleReceivedTelemetry) + }); + } + + handleReceivedTelemetry(datum) { + if (!datum || !datum.id) { + console.log('no data received'); + return; + } + + this.criteria.filter(criterion => criterion.telemetryObjectIdAsString === datum.id) + .forEach(subscribingCriterion => { + subscribingCriterion.emit(`subscription`, datum) + }); } update(conditionConfiguration) { @@ -214,11 +229,8 @@ export default class ConditionClass extends EventEmitter { }); } - subscribe() { - // TODO it looks like on any single criterion update subscriptions fire for all criteria - this.criteria.forEach((criterion) => { - criterion.subscribe(); - }) + getTelemetrySubscriptions() { + return this.criteria.map(criterion => criterion.telemetryObjectIdAsString); } handleConditionUpdated(datum) { diff --git a/src/plugins/condition/ConditionManager.js b/src/plugins/condition/ConditionManager.js index c7906806af..72d10046d3 100644 --- a/src/plugins/condition/ConditionManager.js +++ b/src/plugins/condition/ConditionManager.js @@ -38,6 +38,8 @@ export default class ConditionManager extends EventEmitter { this.stopObservingForChanges = this.openmct.objects.observe(this.conditionSetDomainObject, '*', (newDomainObject) => { this.update(newDomainObject); }); + + this.subscribeToTelemetry(); } load() { @@ -255,6 +257,28 @@ export default class ConditionManager extends EventEmitter { }); } + subscribeToTelemetry() { + this.load().then((endpoints) => { + this.unsubscribes = endpoints.map(endpoint => { + this.openmct.telemetry.subscribe( + endpoint, + this.broadcastTelemetry + .bind(this, this.openmct.objects.makeKeyString(endpoint.identifier))); + }); + }); + } + + broadcastTelemetry(id, datum) { + this.conditionClassCollection.filter(condition => { + return condition.getTelemetrySubscriptions().includes(id); + }).forEach(subscribingCondition => { + subscribingCondition.emit( + `subscription:${id}`, + Object.assign({}, datum, {id: id}) + ); + }); + } + persistConditions() { this.openmct.objects.mutate(this.conditionSetDomainObject, 'configuration.conditionCollection', this.conditionSetDomainObject.configuration.conditionCollection); } diff --git a/src/plugins/condition/criterion/TelemetryCriterion.js b/src/plugins/condition/criterion/TelemetryCriterion.js index f9b2350211..19501f63f2 100644 --- a/src/plugins/condition/criterion/TelemetryCriterion.js +++ b/src/plugins/condition/criterion/TelemetryCriterion.js @@ -44,16 +44,17 @@ export default class TelemetryCriterion extends EventEmitter { this.operation = telemetryDomainObjectDefinition.operation; this.input = telemetryDomainObjectDefinition.input; this.metadata = telemetryDomainObjectDefinition.metadata; - this.subscription = null; - this.telemetryObjectIdAsString = null; - this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)).then((obj) => this.initialize(obj)); + // this.subscription = null; + this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetry); + this.on('subscription', this.handleSubscription); + // this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)).then((obj) => this.initialize(obj)); } - initialize(obj) { - this.telemetryObject = obj; - this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetryObject.identifier); - this.emitEvent('criterionUpdated', this); - } + // initialize(obj) { + // this.telemetryObject = obj; + // this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetryObject.identifier); + // this.emitEvent('criterionUpdated', this); + // } formatData(data) { const datum = { @@ -70,7 +71,9 @@ export default class TelemetryCriterion extends EventEmitter { } handleSubscription(data) { - this.emitEvent('criterionResultUpdated', this.formatData(data)); + if(this.isValid()) { + this.emitEvent('criterionResultUpdated', this.formatData(data)); + } } findOperation(operation) { @@ -106,7 +109,7 @@ export default class TelemetryCriterion extends EventEmitter { } isValid() { - return this.telemetryObject && this.metadata && this.operation; + return this.metadata && this.operation; } requestLAD(options) { @@ -120,7 +123,7 @@ export default class TelemetryCriterion extends EventEmitter { return this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)) .then((obj) => { - if (!obj || !this.metadata || !this.operation) { + if (!obj || !this.isValid()) { return this.formatData({}); } return this.telemetryAPI.request( @@ -136,36 +139,9 @@ export default class TelemetryCriterion extends EventEmitter { }); } - /** - * Subscribes to the telemetry object and returns an unsubscribe function - * If the telemetry is not valid, returns nothing - */ - subscribe() { - if (this.isValid()) { - this.unsubscribe(); - this.subscription = this.telemetryAPI.subscribe(this.telemetryObject, (datum) => { - this.handleSubscription(datum); - }); - } else { - this.handleSubscription(); - } - } - - /** - * Calls an unsubscribe function returned by subscribe() and deletes any initialized data - */ - unsubscribe() { - //unsubscribe from telemetry source - if (typeof this.subscription === 'function') { - this.subscription(); - } - delete this.subscription; - } - destroy() { - this.unsubscribe(); + this.off('receivedTelemetry', this.handleSubscription); this.emitEvent('criterionRemoved'); delete this.telemetryObjectIdAsString; - delete this.telemetryObject; } }