Merge pull request #2759 from nasa/dave/conditions-telemetry

[Conditions] Request, Subscribe to, and Provide Telemetry
This commit is contained in:
Shefali Joshi 2020-03-24 11:43:09 -07:00 committed by GitHub
commit 2c1b4b4cfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 276 additions and 95 deletions

View File

@ -48,18 +48,32 @@ export default class ConditionClass extends EventEmitter {
* @param conditionConfiguration: {id: uuid,trigger: enum, criteria: Array of {id: uuid, operation: enum, input: Array, metaDataKey: string, key: {domainObject.identifier} }
* @param openmct
*/
constructor(conditionConfiguration, openmct) {
constructor(conditionConfiguration, openmct, conditionManager) {
super();
this.openmct = openmct;
this.conditionManager = conditionManager;
this.id = conditionConfiguration.id;
this.criteria = [];
this.criteriaResults = {};
this.result = undefined;
this.latestTimestamp = {};
if (conditionConfiguration.configuration.criteria) {
this.createCriteria(conditionConfiguration.configuration.criteria);
}
this.trigger = conditionConfiguration.configuration.trigger;
this.conditionManager.on('broadcastTelemetry', this.handleBroadcastTelemetry, this);
}
handleBroadcastTelemetry(datum) {
if (!datum || !datum.id) {
console.log('no data received');
return;
}
this.criteria.forEach(criterion => {
criterion.emit(`subscription:${datum.id}`, datum);
});
}
update(conditionConfiguration) {
@ -172,7 +186,6 @@ export default class ConditionClass extends EventEmitter {
let found = this.findCriterion(criterion.id);
if (found) {
this.criteria[found.index] = criterion.data;
this.subscribe();
// TODO nothing is listening to this
this.emitEvent('conditionUpdated', {
trigger: this.trigger,
@ -181,21 +194,40 @@ export default class ConditionClass extends EventEmitter {
}
}
handleCriterionResult(eventData) {
updateCriteriaResults(eventData) {
const id = eventData.id;
if (this.findCriterion(id)) {
this.criteriaResults[id] = eventData.data.result;
}
}
handleCriterionResult(eventData) {
this.updateCriteriaResults(eventData);
this.handleConditionUpdated(eventData.data);
}
subscribe() {
// TODO it looks like on any single criterion update subscriptions fire for all criteria
this.criteria.forEach((criterion) => {
criterion.subscribe();
})
requestLADConditionResult() {
const criteriaResults = this.criteria
.map(criterion => criterion.requestLAD());
return Promise.all(criteriaResults)
.then(results => {
results.forEach(result => {
this.updateCriteriaResults(result);
this.latestTimestamp = this.getLatestTimestamp(this.latestTimestamp, result.data)
});
this.evaluate();
return {
id: this.id,
data: Object.assign({}, this.latestTimestamp, { result: this.result })
}
});
}
getTelemetrySubscriptions() {
return this.criteria.map(criterion => criterion.telemetryObjectIdAsString);
}
handleConditionUpdated(datum) {
@ -223,6 +255,19 @@ export default class ConditionClass extends EventEmitter {
this.result = computeCondition(this.criteriaResults, this.trigger === TRIGGER.ALL);
}
getLatestTimestamp(current, compare) {
const timestamp = Object.assign({}, current);
this.openmct.time.getAllTimeSystems().forEach(timeSystem => {
if (!timestamp[timeSystem.key]
|| compare[timeSystem.key] > timestamp[timeSystem.key]
) {
timestamp[timeSystem.key] = compare[timeSystem.key];
}
});
return timestamp;
}
emitEvent(eventName, data) {
this.emit(eventName, {
id: this.id,
@ -231,6 +276,7 @@ export default class ConditionClass extends EventEmitter {
}
destroy() {
this.conditionManager.off('broadcastTelemetry', this.handleBroadcastTelemetry, this);
if (typeof this.stopObservingForChanges === 'function') {
this.stopObservingForChanges();
}

View File

@ -31,6 +31,11 @@ export default class ConditionManager extends EventEmitter {
this.conditionSetDomainObject = conditionSetDomainObject;
this.timeAPI = this.openmct.time;
this.latestTimestamp = {};
this.composition = this.openmct.composition.get(conditionSetDomainObject);
this.composition.on('add', this.subscribeToTelemetry, this);
this.composition.on('remove', this.unsubscribeFromTelemetry, this);
this.compositionLoad = this.composition.load();
this.subscriptions = {};
this.initialize();
this.stopObservingForChanges = this.openmct.objects.observe(this.conditionSetDomainObject, '*', (newDomainObject) => {
@ -38,6 +43,30 @@ export default class ConditionManager extends EventEmitter {
});
}
subscribeToTelemetry(endpoint) {
const id = this.openmct.objects.makeKeyString(endpoint.identifier);
if (this.subscriptions[id]) {
console.log('subscription already exists');
return;
}
this.subscriptions[id] = this.openmct.telemetry.subscribe(
endpoint,
this.broadcastTelemetry.bind(this, id)
);
}
unsubscribeFromTelemetry(endpointIdentifier) {
const id = this.openmct.objects.makeKeyString(endpointIdentifier);
if (!this.subscriptions[id]) {
console.log('no subscription to remove');
return;
}
this.subscriptions[id]();
delete this.subscriptions[id];
}
initialize() {
this.conditionResults = {};
this.conditionClassCollection = [];
@ -56,18 +85,13 @@ export default class ConditionManager extends EventEmitter {
}
initCondition(conditionConfiguration, index) {
let condition = new Condition(conditionConfiguration, this.openmct);
let condition = new Condition(conditionConfiguration, this.openmct, this);
condition.on('conditionResultUpdated', this.handleConditionResult.bind(this));
if (index !== undefined) {
this.conditionClassCollection.splice(index + 1, 0, condition);
} else {
this.conditionClassCollection.unshift(condition);
}
//There are no criteria for a default condition and hence no subscriptions.
//Hence the conditionResult must be manually triggered for it.
if (conditionConfiguration.isDefault) {
this.handleConditionResult();
}
}
createCondition(conditionConfiguration) {
@ -150,18 +174,10 @@ export default class ConditionManager extends EventEmitter {
this.persistConditions();
}
handleConditionResult(resultObj) {
getCurrentCondition() {
const conditionCollection = this.conditionSetDomainObject.configuration.conditionCollection;
let currentCondition = conditionCollection[conditionCollection.length-1];
if (resultObj) {
const id = resultObj.id;
if (this.findConditionById(id)) {
this.conditionResults[id] = resultObj.data.result;
}
this.updateTimestamp(resultObj.data);
}
for (let i = 0; i < conditionCollection.length - 1; i++) {
if (this.conditionResults[conditionCollection[i].id]) {
//first condition to be true wins
@ -169,7 +185,27 @@ export default class ConditionManager extends EventEmitter {
break;
}
}
return currentCondition;
}
updateConditionResults(resultObj) {
if (!resultObj) {
return;
}
const id = resultObj.id;
if (this.findConditionById(id)) {
this.conditionResults[id] = resultObj.data.result;
}
this.updateTimestamp(resultObj.data);
}
handleConditionResult(resultObj) {
// update conditions results and then calculate the current condition
this.updateConditionResults(resultObj);
const currentCondition = this.getCurrentCondition();
this.emit('conditionSetResultUpdated',
Object.assign(
{
@ -192,14 +228,50 @@ export default class ConditionManager extends EventEmitter {
});
}
requestLADConditionSetOutput() {
if (!this.conditionClassCollection.length || this.conditionClassCollection.length === 1) {
return Promise.resolve([]);
}
return this.compositionLoad.then(() => {
const ladConditionResults = this.conditionClassCollection
.map(condition => condition.requestLADConditionResult());
return Promise.all(ladConditionResults)
.then((results) => {
results.forEach(resultObj => { this.updateConditionResults(resultObj); });
const currentCondition = this.getCurrentCondition();
return Object.assign(
{
output: currentCondition.configuration.output,
id: this.conditionSetDomainObject.identifier,
conditionId: currentCondition.id
},
this.latestTimestamp
);
});
});
}
broadcastTelemetry(id, datum) {
this.emit(`broadcastTelemetry`, Object.assign({}, datum, {id: id}));
}
persistConditions() {
this.openmct.objects.mutate(this.conditionSetDomainObject, 'configuration.conditionCollection', this.conditionSetDomainObject.configuration.conditionCollection);
}
destroy() {
this.composition.off('add', this.subscribeToTelemetry, this);
this.composition.off('remove', this.unsubscribeFromTelemetry, this);
Object.values(this.subscriptions).forEach(unsubscribe => unsubscribe());
this.subscriptions = undefined;
if(this.stopObservingForChanges) {
this.stopObservingForChanges();
}
this.conditionClassCollection.forEach((condition) => {
condition.off('conditionResultUpdated', this.handleConditionResult);
condition.destroy();

View File

@ -47,6 +47,8 @@ describe('ConditionManager', () => {
]
}
};
let mockComposition;
let loader;
function mockAngularComponents() {
let mockInjector = jasmine.createSpyObj('$injector', ['get']);
@ -71,9 +73,33 @@ describe('ConditionManager', () => {
openmct.$injector = mockInjector;
}
beforeAll(function () {
beforeEach(function () {
mockAngularComponents();
mockListener = jasmine.createSpy('mockListener');
loader = {};
loader.promise = new Promise(function (resolve, reject) {
loader.resolve = resolve;
loader.reject = reject;
});
mockComposition = jasmine.createSpyObj('compositionCollection', [
'load',
'on',
'off'
]);
mockComposition.load.and.callFake(() => {
setTimeout(() => {
loader.resolve();
});
return loader.promise;
});
mockComposition.on('add', mockListener);
mockComposition.on('remove', mockListener);
openmct.composition = jasmine.createSpyObj('compositionAPI', [
'get'
]);
openmct.composition.get.and.returnValue(mockComposition);
openmct.objects = jasmine.createSpyObj('objects', ['get', 'makeKeyString', 'observe', 'mutate']);
openmct.objects.get.and.returnValues(new Promise(function (resolve, reject) {
@ -84,10 +110,11 @@ describe('ConditionManager', () => {
openmct.objects.makeKeyString.and.returnValue(conditionSetDomainObject.identifier.key);
openmct.objects.observe.and.returnValue(function () {});
openmct.objects.mutate.and.returnValue(function () {});
conditionMgr = new ConditionManager(conditionSetDomainObject, openmct);
mockListener = jasmine.createSpy('mockListener');
conditionMgr.on('conditionSetResultUpdated', mockListener);
conditionMgr.on('broadcastTelemetry', mockListener);
});
it('creates a conditionCollection with a default condition', function () {

View File

@ -25,27 +25,60 @@ import ConditionManager from './ConditionManager'
export default class ConditionSetTelemetryProvider {
constructor(openmct) {
this.openmct = openmct;
this.conditionManagerPool = {};
}
isTelemetryObject(domainObject) {
return domainObject.type === 'conditionSet';
}
supportsRequest(domainObject, options) {
return false;
supportsRequest(domainObject) {
return domainObject.type === 'conditionSet';
}
supportsSubscribe(domainObject) {
return domainObject.type === 'conditionSet';
}
request(domainObject) {
let conditionManager = this.getConditionManager(domainObject);
return conditionManager.requestLADConditionSetOutput()
.then(latestOutput => {
return latestOutput ? [latestOutput] : [];
});
}
subscribe(domainObject, callback) {
let conditionManager = new ConditionManager(domainObject, this.openmct);
let conditionManager = this.getConditionManager(domainObject);
conditionManager.on('conditionSetResultUpdated', callback);
return function unsubscribe() {
conditionManager.off('conditionSetResultUpdated');
conditionManager.destroy();
return this.destroyConditionManager.bind(this, this.openmct.objects.makeKeyString(domainObject.identifier));
}
/**
* returns conditionManager instance for corresponding domain object
* creates the instance if it is not yet created
* @private
*/
getConditionManager(domainObject) {
const id = this.openmct.objects.makeKeyString(domainObject.identifier);
if (!this.conditionManagerPool[id]) {
this.conditionManagerPool[id] = new ConditionManager(domainObject, this.openmct);
}
return this.conditionManagerPool[id];
}
/**
* cleans up and destroys conditionManager instance for corresponding domain object id
* can be called manually for views that only request but do not subscribe to data
*/
destroyConditionManager(id) {
this.conditionManagerPool[id].off('conditionSetResultUpdated');
this.conditionManagerPool[id].destroy();
delete this.conditionManagerPool[id];
}
}

View File

@ -28,11 +28,19 @@ let openmct = {},
mockListener,
testConditionDefinition,
testTelemetryObject,
conditionObj;
conditionObj,
conditionManager,
mockBroadcastTelemetry;
describe("The condition", function () {
beforeEach (() => {
conditionManager = jasmine.createSpyObj('conditionManager',
['on']
);
mockBroadcastTelemetry = jasmine.createSpy('listener');
conditionManager.on('broadcastTelemetry', mockBroadcastTelemetry);
mockListener = jasmine.createSpy('listener');
testTelemetryObject = {
identifier:{ namespace: "", key: "test-object"},
@ -66,11 +74,14 @@ describe("The condition", function () {
testConditionDefinition = {
id: '123-456',
configuration: {
name: 'mock condition',
output: 'mock output',
trigger: TRIGGER.ANY,
criteria: [
{
id: '1234-5678-9999-0000',
operation: 'equalTo',
input: false,
input: ['0'],
metadata: 'value',
telemetry: testTelemetryObject.identifier
}
@ -80,14 +91,14 @@ describe("The condition", function () {
conditionObj = new Condition(
testConditionDefinition,
openmct
openmct,
conditionManager
);
conditionObj.on('conditionUpdated', mockListener);
});
it("generates criteria with an id", function () {
it("generates criteria with the correct properties", function () {
const testCriterion = testConditionDefinition.configuration.criteria[0];
let criterion = conditionObj.generateCriterion(testCriterion);
expect(criterion.id).toBeDefined();
@ -98,6 +109,7 @@ describe("The condition", function () {
});
it("initializes with an id", function () {
console.log(conditionObj);
expect(conditionObj.id).toBeDefined();
});

View File

@ -107,6 +107,7 @@ export default {
this.composition.off('add', this.addTelemetryObject);
this.composition.off('remove', this.removeTelemetryObject);
if(this.conditionManager) {
this.conditionManager.off('conditionSetResultUpdated', this.handleConditionSetResultUpdated);
this.conditionManager.destroy();
}
if (this.stopObservingForChanges) {
@ -121,11 +122,12 @@ export default {
this.conditionCollection = this.domainObject.configuration.conditionCollection;
this.observeForChanges();
this.conditionManager = new ConditionManager(this.domainObject, this.openmct);
this.conditionManager.on('conditionSetResultUpdated', (data) => {
this.$emit('conditionSetResultUpdated', data);
})
this.conditionManager.on('conditionSetResultUpdated', this.handleConditionSetResultUpdated);
},
methods: {
handleConditionSetResultUpdated(data) {
this.$emit('conditionSetResultUpdated', data)
},
observeForChanges() {
this.stopObservingForChanges = this.openmct.objects.observe(this.domainObject, 'configuration.conditionCollection', (newConditionCollection) => {
this.conditionCollection = newConditionCollection;

View File

@ -44,33 +44,37 @@ export default class TelemetryCriterion extends EventEmitter {
this.operation = telemetryDomainObjectDefinition.operation;
this.input = telemetryDomainObjectDefinition.input;
this.metadata = telemetryDomainObjectDefinition.metadata;
this.subscription = null;
this.telemetryObjectIdAsString = null;
this.telemetryObjectIdAsString = undefined;
this.objectAPI.get(this.objectAPI.makeKeyString(this.telemetry)).then((obj) => this.initialize(obj));
}
initialize(obj) {
this.telemetryObject = obj;
this.telemetryMetaData = this.openmct.telemetry.getMetadata(obj).valueMetadatas;
this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetryObject.identifier);
this.telemetryObjectIdAsString = this.objectAPI.makeKeyString(this.telemetry);
this.on(`subscription:${this.telemetryObjectIdAsString}`, this.handleSubscription);
this.emitEvent('criterionUpdated', this);
}
handleSubscription(data) {
if (data) {
data = this.createNormalizedDatum(data);
}
formatData(data) {
const normalizedDatum = this.createNormalizedDatum(data);
const datum = {
result: this.computeResult(data)
};
if (data) {
// TODO check back to see if we should format times here
this.timeAPI.getAllTimeSystems().forEach(timeSystem => {
datum[timeSystem.key] = data[timeSystem.key]
});
result: this.computeResult(normalizedDatum)
}
this.emitEvent('criterionResultUpdated', datum);
if (normalizedDatum) {
// TODO check back to see if we should format times here
this.timeAPI.getAllTimeSystems().forEach(timeSystem => {
datum[timeSystem.key] = normalizedDatum[timeSystem.key]
});
}
return datum;
}
handleSubscription(data) {
if(this.isValid()) {
this.emitEvent('criterionResultUpdated', this.formatData(data));
}
}
createNormalizedDatum(telemetryDatum) {
@ -116,34 +120,33 @@ export default class TelemetryCriterion extends EventEmitter {
return this.telemetryObject && this.metadata && this.operation;
}
/**
* Subscribes to the telemetry object and returns an unsubscribe function
* If the telemetry is not valid, returns nothing
*/
subscribe() {
if (this.isValid()) {
this.unsubscribe();
this.subscription = this.telemetryAPI.subscribe(this.telemetryObject, (datum) => {
this.handleSubscription(datum);
});
} else {
this.handleSubscription();
}
}
requestLAD(options) {
options = Object.assign({},
options,
{
strategy: 'latest',
size: 1
}
);
/**
* Calls an unsubscribe function returned by subscribe() and deletes any initialized data
*/
unsubscribe() {
//unsubscribe from telemetry source
if (typeof this.subscription === 'function') {
this.subscription();
if (!this.isValid()) {
return this.formatData({});
}
delete this.subscription;
return this.telemetryAPI.request(
this.telemetryObject,
options
).then(results => {
const latestDatum = results.length ? results[results.length - 1] : {};
return {
id: this.id,
data: this.formatData(latestDatum)
};
});
}
destroy() {
this.unsubscribe();
this.off(`subscription:${this.telemetryObjectIdAsString}`, this.handleSubscription);
this.emitEvent('criterionRemoved');
delete this.telemetryObjectIdAsString;
delete this.telemetryObject;

View File

@ -78,7 +78,9 @@ describe("The telemetry criterion", function () {
testCriterionDefinition = {
id: 'test-criterion-id',
telemetry: openmct.objects.makeKeyString(testTelemetryObject.identifier)
telemetry: openmct.objects.makeKeyString(testTelemetryObject.identifier),
operation: 'lessThan',
metadata: 'sin'
};
mockListener = jasmine.createSpy('listener');
@ -102,12 +104,7 @@ describe("The telemetry criterion", function () {
expect(mockListener2).toHaveBeenCalled();
});
it("subscribes to telemetry providers", function () {
telemetryCriterion.subscribe();
expect(telemetryCriterion.subscription).toBeDefined();
});
it("emits update event on new data from telemetry providers", function () {
it("updates and emits event on new data from telemetry providers", function () {
telemetryCriterion.initialize(testTelemetryObject);
spyOn(telemetryCriterion, 'emitEvent').and.callThrough();
telemetryCriterion.handleSubscription({
@ -115,16 +112,5 @@ describe("The telemetry criterion", function () {
utc: 'Hi'
});
expect(telemetryCriterion.emitEvent).toHaveBeenCalled();
expect(mockListener).toHaveBeenCalled();
});
it("un-subscribes from telemetry providers", function () {
telemetryCriterion.subscribe();
expect(telemetryCriterion.subscription).toBeDefined();
telemetryCriterion.destroy();
expect(telemetryCriterion.subscription).toBeUndefined();
expect(telemetryCriterion.telemetryObjectIdAsString).toBeUndefined();
expect(telemetryCriterion.telemetryObject).toBeUndefined();
});
});