mirror of
https://github.com/nasa/openmct.git
synced 2025-06-15 05:38:12 +00:00
[Telemetry Collections] Add Telemetry Collection Functionality to Telemetry API (#3689)
Adds telemetry collections to the telemetry API Co-authored-by: Shefali Joshi <simplyrender@gmail.com> Co-authored-by: Andrew Henry <akhenry@gmail.com>
This commit is contained in:
@ -23,20 +23,18 @@
|
||||
define([
|
||||
'EventEmitter',
|
||||
'lodash',
|
||||
'./collections/BoundedTableRowCollection',
|
||||
'./collections/FilteredTableRowCollection',
|
||||
'./TelemetryTableNameColumn',
|
||||
'./collections/TableRowCollection',
|
||||
'./TelemetryTableRow',
|
||||
'./TelemetryTableNameColumn',
|
||||
'./TelemetryTableColumn',
|
||||
'./TelemetryTableUnitColumn',
|
||||
'./TelemetryTableConfiguration'
|
||||
], function (
|
||||
EventEmitter,
|
||||
_,
|
||||
BoundedTableRowCollection,
|
||||
FilteredTableRowCollection,
|
||||
TelemetryTableNameColumn,
|
||||
TableRowCollection,
|
||||
TelemetryTableRow,
|
||||
TelemetryTableNameColumn,
|
||||
TelemetryTableColumn,
|
||||
TelemetryTableUnitColumn,
|
||||
TelemetryTableConfiguration
|
||||
@ -48,20 +46,23 @@ define([
|
||||
this.domainObject = domainObject;
|
||||
this.openmct = openmct;
|
||||
this.rowCount = 100;
|
||||
this.subscriptions = {};
|
||||
this.tableComposition = undefined;
|
||||
this.telemetryObjects = [];
|
||||
this.datumCache = [];
|
||||
this.outstandingRequests = 0;
|
||||
this.configuration = new TelemetryTableConfiguration(domainObject, openmct);
|
||||
this.paused = false;
|
||||
this.keyString = this.openmct.objects.makeKeyString(this.domainObject.identifier);
|
||||
|
||||
this.telemetryObjects = {};
|
||||
this.telemetryCollections = {};
|
||||
this.delayedActions = [];
|
||||
this.outstandingRequests = 0;
|
||||
|
||||
this.addTelemetryObject = this.addTelemetryObject.bind(this);
|
||||
this.removeTelemetryObject = this.removeTelemetryObject.bind(this);
|
||||
this.removeTelemetryCollection = this.removeTelemetryCollection.bind(this);
|
||||
this.resetRowsFromAllData = this.resetRowsFromAllData.bind(this);
|
||||
this.isTelemetryObject = this.isTelemetryObject.bind(this);
|
||||
this.refreshData = this.refreshData.bind(this);
|
||||
this.requestDataFor = this.requestDataFor.bind(this);
|
||||
this.updateFilters = this.updateFilters.bind(this);
|
||||
this.buildOptionsFromConfiguration = this.buildOptionsFromConfiguration.bind(this);
|
||||
|
||||
@ -102,8 +103,7 @@ define([
|
||||
}
|
||||
|
||||
createTableRowCollections() {
|
||||
this.boundedRows = new BoundedTableRowCollection(this.openmct);
|
||||
this.filteredRows = new FilteredTableRowCollection(this.boundedRows);
|
||||
this.tableRows = new TableRowCollection();
|
||||
|
||||
//Fetch any persisted default sort
|
||||
let sortOptions = this.configuration.getConfiguration().sortOptions;
|
||||
@ -113,11 +113,14 @@ define([
|
||||
key: this.openmct.time.timeSystem().key,
|
||||
direction: 'asc'
|
||||
};
|
||||
this.filteredRows.sortBy(sortOptions);
|
||||
|
||||
this.tableRows.sortBy(sortOptions);
|
||||
this.tableRows.on('resetRowsFromAllData', this.resetRowsFromAllData);
|
||||
}
|
||||
|
||||
loadComposition() {
|
||||
this.tableComposition = this.openmct.composition.get(this.domainObject);
|
||||
|
||||
if (this.tableComposition !== undefined) {
|
||||
this.tableComposition.load().then((composition) => {
|
||||
|
||||
@ -132,66 +135,64 @@ define([
|
||||
|
||||
addTelemetryObject(telemetryObject) {
|
||||
this.addColumnsForObject(telemetryObject, true);
|
||||
this.requestDataFor(telemetryObject);
|
||||
this.subscribeTo(telemetryObject);
|
||||
this.telemetryObjects.push(telemetryObject);
|
||||
|
||||
const keyString = this.openmct.objects.makeKeyString(telemetryObject.identifier);
|
||||
let requestOptions = this.buildOptionsFromConfiguration(telemetryObject);
|
||||
let columnMap = this.getColumnMapForObject(keyString);
|
||||
let limitEvaluator = this.openmct.telemetry.limitEvaluator(telemetryObject);
|
||||
|
||||
this.incrementOutstandingRequests();
|
||||
|
||||
const telemetryProcessor = this.getTelemetryProcessor(keyString, columnMap, limitEvaluator);
|
||||
const telemetryRemover = this.getTelemetryRemover();
|
||||
|
||||
this.removeTelemetryCollection(keyString);
|
||||
|
||||
this.telemetryCollections[keyString] = this.openmct.telemetry
|
||||
.requestTelemetryCollection(telemetryObject, requestOptions);
|
||||
|
||||
this.telemetryCollections[keyString].on('remove', telemetryRemover);
|
||||
this.telemetryCollections[keyString].on('add', telemetryProcessor);
|
||||
this.telemetryCollections[keyString].load();
|
||||
|
||||
this.decrementOutstandingRequests();
|
||||
|
||||
this.telemetryObjects[keyString] = {
|
||||
telemetryObject,
|
||||
keyString,
|
||||
requestOptions,
|
||||
columnMap,
|
||||
limitEvaluator
|
||||
};
|
||||
|
||||
this.emit('object-added', telemetryObject);
|
||||
}
|
||||
|
||||
updateFilters(updatedFilters) {
|
||||
let deepCopiedFilters = JSON.parse(JSON.stringify(updatedFilters));
|
||||
getTelemetryProcessor(keyString, columnMap, limitEvaluator) {
|
||||
return (telemetry) => {
|
||||
//Check that telemetry object has not been removed since telemetry was requested.
|
||||
if (!this.telemetryObjects[keyString]) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.filters && !_.isEqual(this.filters, deepCopiedFilters)) {
|
||||
this.filters = deepCopiedFilters;
|
||||
this.clearAndResubscribe();
|
||||
} else {
|
||||
this.filters = deepCopiedFilters;
|
||||
}
|
||||
let telemetryRows = telemetry.map(datum => new TelemetryTableRow(datum, columnMap, keyString, limitEvaluator));
|
||||
|
||||
if (this.paused) {
|
||||
this.delayedActions.push(this.tableRows.addRows.bind(this, telemetryRows, 'add'));
|
||||
} else {
|
||||
this.tableRows.addRows(telemetryRows, 'add');
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
clearAndResubscribe() {
|
||||
this.filteredRows.clear();
|
||||
this.boundedRows.clear();
|
||||
Object.keys(this.subscriptions).forEach(this.unsubscribe, this);
|
||||
|
||||
this.telemetryObjects.forEach(this.requestDataFor.bind(this));
|
||||
this.telemetryObjects.forEach(this.subscribeTo.bind(this));
|
||||
}
|
||||
|
||||
removeTelemetryObject(objectIdentifier) {
|
||||
this.configuration.removeColumnsForObject(objectIdentifier, true);
|
||||
let keyString = this.openmct.objects.makeKeyString(objectIdentifier);
|
||||
this.boundedRows.removeAllRowsForObject(keyString);
|
||||
this.unsubscribe(keyString);
|
||||
this.telemetryObjects = this.telemetryObjects.filter((object) => !_.eq(objectIdentifier, object.identifier));
|
||||
|
||||
this.emit('object-removed', objectIdentifier);
|
||||
}
|
||||
|
||||
requestDataFor(telemetryObject) {
|
||||
this.incrementOutstandingRequests();
|
||||
let requestOptions = this.buildOptionsFromConfiguration(telemetryObject);
|
||||
|
||||
return this.openmct.telemetry.request(telemetryObject, requestOptions)
|
||||
.then(telemetryData => {
|
||||
//Check that telemetry object has not been removed since telemetry was requested.
|
||||
if (!this.telemetryObjects.includes(telemetryObject)) {
|
||||
return;
|
||||
}
|
||||
|
||||
let keyString = this.openmct.objects.makeKeyString(telemetryObject.identifier);
|
||||
let columnMap = this.getColumnMapForObject(keyString);
|
||||
let limitEvaluator = this.openmct.telemetry.limitEvaluator(telemetryObject);
|
||||
this.processHistoricalData(telemetryData, columnMap, keyString, limitEvaluator);
|
||||
}).finally(() => {
|
||||
this.decrementOutstandingRequests();
|
||||
});
|
||||
}
|
||||
|
||||
processHistoricalData(telemetryData, columnMap, keyString, limitEvaluator) {
|
||||
let telemetryRows = telemetryData.map(datum => new TelemetryTableRow(datum, columnMap, keyString, limitEvaluator));
|
||||
this.boundedRows.add(telemetryRows);
|
||||
getTelemetryRemover() {
|
||||
return (telemetry) => {
|
||||
if (this.paused) {
|
||||
this.delayedActions.push(this.tableRows.removeRowsByData.bind(this, telemetry));
|
||||
} else {
|
||||
this.tableRows.removeRowsByData(telemetry);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -216,35 +217,72 @@ define([
|
||||
}
|
||||
}
|
||||
|
||||
// will pull all necessary information for all existing bounded telemetry
|
||||
// and pass to table row collection to reset without making any new requests
|
||||
// triggered by filtering
|
||||
resetRowsFromAllData() {
|
||||
let allRows = [];
|
||||
|
||||
Object.keys(this.telemetryCollections).forEach(keyString => {
|
||||
let { columnMap, limitEvaluator } = this.telemetryObjects[keyString];
|
||||
|
||||
this.telemetryCollections[keyString].getAll().forEach(datum => {
|
||||
allRows.push(new TelemetryTableRow(datum, columnMap, keyString, limitEvaluator));
|
||||
});
|
||||
});
|
||||
|
||||
this.tableRows.addRows(allRows, 'filter');
|
||||
}
|
||||
|
||||
updateFilters(updatedFilters) {
|
||||
let deepCopiedFilters = JSON.parse(JSON.stringify(updatedFilters));
|
||||
|
||||
if (this.filters && !_.isEqual(this.filters, deepCopiedFilters)) {
|
||||
this.filters = deepCopiedFilters;
|
||||
this.tableRows.clear();
|
||||
this.clearAndResubscribe();
|
||||
} else {
|
||||
this.filters = deepCopiedFilters;
|
||||
}
|
||||
}
|
||||
|
||||
clearAndResubscribe() {
|
||||
let objectKeys = Object.keys(this.telemetryObjects);
|
||||
|
||||
this.tableRows.clear();
|
||||
objectKeys.forEach((keyString) => {
|
||||
this.addTelemetryObject(this.telemetryObjects[keyString].telemetryObject);
|
||||
});
|
||||
}
|
||||
|
||||
removeTelemetryObject(objectIdentifier) {
|
||||
const keyString = this.openmct.objects.makeKeyString(objectIdentifier);
|
||||
|
||||
this.configuration.removeColumnsForObject(objectIdentifier, true);
|
||||
this.tableRows.removeRowsByObject(keyString);
|
||||
|
||||
this.removeTelemetryCollection(keyString);
|
||||
delete this.telemetryObjects[keyString];
|
||||
|
||||
this.emit('object-removed', objectIdentifier);
|
||||
}
|
||||
|
||||
refreshData(bounds, isTick) {
|
||||
if (!isTick && this.outstandingRequests === 0) {
|
||||
this.filteredRows.clear();
|
||||
this.boundedRows.clear();
|
||||
this.boundedRows.sortByTimeSystem(this.openmct.time.timeSystem());
|
||||
this.telemetryObjects.forEach(this.requestDataFor);
|
||||
if (!isTick && this.tableRows.outstandingRequests === 0) {
|
||||
this.tableRows.clear();
|
||||
this.tableRows.sortBy({
|
||||
key: this.openmct.time.timeSystem().key,
|
||||
direction: 'asc'
|
||||
});
|
||||
this.tableRows.resubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
clearData() {
|
||||
this.filteredRows.clear();
|
||||
this.boundedRows.clear();
|
||||
this.tableRows.clear();
|
||||
this.emit('refresh');
|
||||
}
|
||||
|
||||
getColumnMapForObject(objectKeyString) {
|
||||
let columns = this.configuration.getColumns();
|
||||
|
||||
if (columns[objectKeyString]) {
|
||||
return columns[objectKeyString].reduce((map, column) => {
|
||||
map[column.getKey()] = column;
|
||||
|
||||
return map;
|
||||
}, {});
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
addColumnsForObject(telemetryObject) {
|
||||
let metadataValues = this.openmct.telemetry.getMetadata(telemetryObject).values();
|
||||
|
||||
@ -264,54 +302,18 @@ define([
|
||||
});
|
||||
}
|
||||
|
||||
createColumn(metadatum) {
|
||||
return new TelemetryTableColumn(this.openmct, metadatum);
|
||||
}
|
||||
getColumnMapForObject(objectKeyString) {
|
||||
let columns = this.configuration.getColumns();
|
||||
|
||||
createUnitColumn(metadatum) {
|
||||
return new TelemetryTableUnitColumn(this.openmct, metadatum);
|
||||
}
|
||||
if (columns[objectKeyString]) {
|
||||
return columns[objectKeyString].reduce((map, column) => {
|
||||
map[column.getKey()] = column;
|
||||
|
||||
subscribeTo(telemetryObject) {
|
||||
let subscribeOptions = this.buildOptionsFromConfiguration(telemetryObject);
|
||||
let keyString = this.openmct.objects.makeKeyString(telemetryObject.identifier);
|
||||
let columnMap = this.getColumnMapForObject(keyString);
|
||||
let limitEvaluator = this.openmct.telemetry.limitEvaluator(telemetryObject);
|
||||
return map;
|
||||
}, {});
|
||||
}
|
||||
|
||||
this.subscriptions[keyString] = this.openmct.telemetry.subscribe(telemetryObject, (datum) => {
|
||||
//Check that telemetry object has not been removed since telemetry was requested.
|
||||
if (!this.telemetryObjects.includes(telemetryObject)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.paused) {
|
||||
let realtimeDatum = {
|
||||
datum,
|
||||
columnMap,
|
||||
keyString,
|
||||
limitEvaluator
|
||||
};
|
||||
|
||||
this.datumCache.push(realtimeDatum);
|
||||
} else {
|
||||
this.processRealtimeDatum(datum, columnMap, keyString, limitEvaluator);
|
||||
}
|
||||
}, subscribeOptions);
|
||||
}
|
||||
|
||||
processDatumCache() {
|
||||
this.datumCache.forEach(cachedDatum => {
|
||||
this.processRealtimeDatum(cachedDatum.datum, cachedDatum.columnMap, cachedDatum.keyString, cachedDatum.limitEvaluator);
|
||||
});
|
||||
this.datumCache = [];
|
||||
}
|
||||
|
||||
processRealtimeDatum(datum, columnMap, keyString, limitEvaluator) {
|
||||
this.boundedRows.add(new TelemetryTableRow(datum, columnMap, keyString, limitEvaluator));
|
||||
}
|
||||
|
||||
isTelemetryObject(domainObject) {
|
||||
return Object.prototype.hasOwnProperty.call(domainObject, 'telemetry');
|
||||
return {};
|
||||
}
|
||||
|
||||
buildOptionsFromConfiguration(telemetryObject) {
|
||||
@ -323,13 +325,20 @@ define([
|
||||
return {filters} || {};
|
||||
}
|
||||
|
||||
unsubscribe(keyString) {
|
||||
this.subscriptions[keyString]();
|
||||
delete this.subscriptions[keyString];
|
||||
createColumn(metadatum) {
|
||||
return new TelemetryTableColumn(this.openmct, metadatum);
|
||||
}
|
||||
|
||||
createUnitColumn(metadatum) {
|
||||
return new TelemetryTableUnitColumn(this.openmct, metadatum);
|
||||
}
|
||||
|
||||
isTelemetryObject(domainObject) {
|
||||
return Object.prototype.hasOwnProperty.call(domainObject, 'telemetry');
|
||||
}
|
||||
|
||||
sortBy(sortOptions) {
|
||||
this.filteredRows.sortBy(sortOptions);
|
||||
this.tableRows.sortBy(sortOptions);
|
||||
|
||||
if (this.openmct.editor.isEditing()) {
|
||||
let configuration = this.configuration.getConfiguration();
|
||||
@ -338,21 +347,36 @@ define([
|
||||
}
|
||||
}
|
||||
|
||||
runDelayedActions() {
|
||||
this.delayedActions.forEach(action => action());
|
||||
this.delayedActions = [];
|
||||
}
|
||||
|
||||
removeTelemetryCollection(keyString) {
|
||||
if (this.telemetryCollections[keyString]) {
|
||||
this.telemetryCollections[keyString].destroy();
|
||||
this.telemetryCollections[keyString] = undefined;
|
||||
delete this.telemetryCollections[keyString];
|
||||
}
|
||||
}
|
||||
|
||||
pause() {
|
||||
this.paused = true;
|
||||
this.boundedRows.unsubscribeFromBounds();
|
||||
}
|
||||
|
||||
unpause() {
|
||||
this.paused = false;
|
||||
this.processDatumCache();
|
||||
this.boundedRows.subscribeToBounds();
|
||||
this.runDelayedActions();
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.boundedRows.destroy();
|
||||
this.filteredRows.destroy();
|
||||
Object.keys(this.subscriptions).forEach(this.unsubscribe, this);
|
||||
this.tableRows.destroy();
|
||||
|
||||
this.tableRows.off('resetRowsFromAllData', this.resetRowsFromAllData);
|
||||
|
||||
let keystrings = Object.keys(this.telemetryCollections);
|
||||
keystrings.forEach(this.removeTelemetryCollection);
|
||||
|
||||
this.openmct.time.off('bounds', this.refreshData);
|
||||
this.openmct.time.off('timeSystem', this.refreshData);
|
||||
|
||||
|
Reference in New Issue
Block a user