conditionManager controls subscriptions

This commit is contained in:
David Tsay 2020-03-16 21:57:42 -07:00
parent 84f0d49d6f
commit b7fffeab1c
3 changed files with 56 additions and 44 deletions

View File

@ -61,6 +61,21 @@ export default class ConditionClass extends EventEmitter {
this.createCriteria(conditionConfiguration.configuration.criteria);
}
this.trigger = conditionConfiguration.configuration.trigger;
this.unsubscribes = this.getTelemetrySubscriptions().map(id => {
this.on(`subscription:${id}`, this.handleReceivedTelemetry)
});
}
handleReceivedTelemetry(datum) {
if (!datum || !datum.id) {
console.log('no data received');
return;
}
this.criteria.filter(criterion => criterion.telemetryObjectIdAsString === datum.id)
.forEach(subscribingCriterion => {
subscribingCriterion.emit(`subscription`, datum)
});
}
update(conditionConfiguration) {
@ -214,11 +229,8 @@ export default class ConditionClass extends EventEmitter {
});
}
subscribe() {
// TODO it looks like on any single criterion update subscriptions fire for all criteria
this.criteria.forEach((criterion) => {
criterion.subscribe();
})
getTelemetrySubscriptions() {
return this.criteria.map(criterion => criterion.telemetryObjectIdAsString);
}
handleConditionUpdated(datum) {

View File

@ -38,6 +38,8 @@ export default class ConditionManager extends EventEmitter {
this.stopObservingForChanges = this.openmct.objects.observe(this.conditionSetDomainObject, '*', (newDomainObject) => {
this.update(newDomainObject);
});
this.subscribeToTelemetry();
}
load() {
@ -255,6 +257,28 @@ export default class ConditionManager extends EventEmitter {
});
}
subscribeToTelemetry() {
this.load().then((endpoints) => {
this.unsubscribes = endpoints.map(endpoint => {
this.openmct.telemetry.subscribe(
endpoint,
this.broadcastTelemetry
.bind(this, this.openmct.objects.makeKeyString(endpoint.identifier)));
});
});
}
broadcastTelemetry(id, datum) {
this.conditionClassCollection.filter(condition => {
return condition.getTelemetrySubscriptions().includes(id);
}).forEach(subscribingCondition => {
subscribingCondition.emit(
`subscription:${id}`,
Object.assign({}, datum, {id: id})
);
});
}
persistConditions() {
this.openmct.objects.mutate(this.conditionSetDomainObject, 'configuration.conditionCollection', this.conditionSetDomainObject.configuration.conditionCollection);
}

View File

@ -44,16 +44,17 @@ export default class TelemetryCriterion extends EventEmitter {
this.operation = telemetryDomainObjectDefinition.operation;
this.input = telemetryDomainObjectDefinition.input;
this.metadata = telemetryDomainObjectDefinition.metadata;
this.subscription = null;
this.telemetryObjectIdAsString = null;
this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)).then((obj) => this.initialize(obj));
// this.subscription = null;
this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetry);
this.on('subscription', this.handleSubscription);
// this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)).then((obj) => this.initialize(obj));
}
initialize(obj) {
this.telemetryObject = obj;
this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetryObject.identifier);
this.emitEvent('criterionUpdated', this);
}
// initialize(obj) {
// this.telemetryObject = obj;
// this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetryObject.identifier);
// this.emitEvent('criterionUpdated', this);
// }
formatData(data) {
const datum = {
@ -70,7 +71,9 @@ export default class TelemetryCriterion extends EventEmitter {
}
handleSubscription(data) {
this.emitEvent('criterionResultUpdated', this.formatData(data));
if(this.isValid()) {
this.emitEvent('criterionResultUpdated', this.formatData(data));
}
}
findOperation(operation) {
@ -106,7 +109,7 @@ export default class TelemetryCriterion extends EventEmitter {
}
isValid() {
return this.telemetryObject && this.metadata && this.operation;
return this.metadata && this.operation;
}
requestLAD(options) {
@ -120,7 +123,7 @@ export default class TelemetryCriterion extends EventEmitter {
return this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry))
.then((obj) => {
if (!obj || !this.metadata || !this.operation) {
if (!obj || !this.isValid()) {
return this.formatData({});
}
return this.telemetryAPI.request(
@ -136,36 +139,9 @@ export default class TelemetryCriterion extends EventEmitter {
});
}
/**
* Subscribes to the telemetry object and returns an unsubscribe function
* If the telemetry is not valid, returns nothing
*/
subscribe() {
if (this.isValid()) {
this.unsubscribe();
this.subscription = this.telemetryAPI.subscribe(this.telemetryObject, (datum) => {
this.handleSubscription(datum);
});
} else {
this.handleSubscription();
}
}
/**
* Calls an unsubscribe function returned by subscribe() and deletes any initialized data
*/
unsubscribe() {
//unsubscribe from telemetry source
if (typeof this.subscription === 'function') {
this.subscription();
}
delete this.subscription;
}
destroy() {
this.unsubscribe();
this.off('receivedTelemetry', this.handleSubscription);
this.emitEvent('criterionRemoved');
delete this.telemetryObjectIdAsString;
delete this.telemetryObject;
}
}