[Tables] Added telemetry buffer so that subscription data is not discarded if it's beyond the end bounds

This commit is contained in:
Henry 2017-01-23 12:43:59 -08:00
parent ae2b73a4f5
commit 6cd99efbb9
5 changed files with 152 additions and 114 deletions

View File

@ -52,6 +52,7 @@
function onSubscribe(message) {
var data = message.data;
// Keep
var start = Date.now();
var step = 1000 / data.dataRateInHz;
var nextStep = start - (start % step) + step;

View File

@ -23,24 +23,23 @@
define(
[
'lodash',
'eventEmitter'
'EventEmitter'
],
function (_, eventEmitter) {
function (_, EventEmitter) {
function TelemetryCollection() {
eventEmitter.call(this, arguments);
EventEmitter.call(this, arguments);
this.telemetry = [];
this.forwardBuffer = [];
this.highBuffer = [];
this.sortField = undefined;
this.lastBounds = {};
this.lastStartIndex = 0;
this.lastEndIndex = 0;
_.bindAll(this,[
'addOne',
'iteratee'
]);
}
TelemetryCollection.prototype = Object.create(eventEmitter.prototype);
TelemetryCollection.prototype = Object.create(EventEmitter.prototype);
TelemetryCollection.prototype.iteratee = function (element) {
return _.get(element, this.sortField);
@ -53,7 +52,7 @@ define(
* collection is cleared and repopulated.
* @param bounds
*/
TelemetryCollection.prototype.tick = function (bounds) {
TelemetryCollection.prototype.bounds = function (bounds) {
var startChanged = this.lastBounds.start !== bounds.start;
var endChanged = this.lastBounds.end !== bounds.end;
var startIndex = 0;
@ -65,71 +64,106 @@ define(
var testValue = _.set({}, this.sortField, bounds.start);
// Calculate the new index of the first element within the bounds
startIndex = _.sortedIndex(this.telemetry, testValue, this.sortField);
discarded = this.telemetry.slice(this.lastStartIndex, startIndex + 1);
discarded = this.telemetry.splice(0, startIndex);
}
if (endChanged) {
var testValue = _.set({}, this.sortField, bounds.end);
// Calculate the new index of the last element in bounds
endIndex = _.sortedLastIndex(this.telemetry, testValue, this.sortField);
added = this.telemetry.slice(this.lastEndIndex, endIndex + 1);
endIndex = _.sortedLastIndex(this.highBuffer, testValue, this.sortField);
added = this.highBuffer.splice(0, endIndex);
this.telemetry = this.telemetry.concat(added);
}
if (discarded.length > 0){
if (discarded && discarded.length > 0){
this.emit('discarded', discarded);
}
if (added.length > 0){
if (added && added.length > 0) {
this.emit('added', added);
}
this.lastBounds = bounds;
};
/*collection.on('added');
collection.on('discarded');*/
TelemetryCollection.prototype.inBounds = function (element) {
var noBoundsDefined = !this.lastBounds || (!this.lastBounds.start && !this.lastBounds.end);
var withinBounds = _.get(element, this.sortField) >= this.lastBounds.start &&
var withinBounds =
_.get(element, this.sortField) >= this.lastBounds.start &&
_.get(element, this.sortField) <= this.lastBounds.end;
return noBoundsDefined || withinBounds;
};
//Todo: addAll for initial historical data
TelemetryCollection.prototype.add = function (element) {
// Going to check for duplicates. Bound the search problem to
// elements around the given time. Use sortedIndex because it
// employs a binary search which is O(log n). Can use binary search
// based on time stamp because the array is guaranteed ordered due
// to sorted insertion.
/**
* @private
* @param element
*/
TelemetryCollection.prototype.addOne = function (element) {
var isDuplicate = false;
var startIx = _.sortedIndex(this.telemetry, element, this.sortField);
var boundsDefined = this.lastBounds && (this.lastBounds.start && this.lastBounds.end);
var array;
if (startIx !== this.telemetry.length) {
var endIx = _.sortedLastIndex(this.telemetry, element, this.sortField);
// Insert into either in-bounds array, or the out of bounds high buffer.
// Data in the high buffer will be re-evaluated for possible insertion on next tick
// Create an array of potential dupes, based on having the
// same time stamp
var potentialDupes = this.telemetry.slice(startIx, endIx + 1);
// Search potential dupes for exact dupe
isDuplicate = _.findIndex(potentialDupes, _.isEqual.bind(undefined, element)) > -1;
if (boundsDefined) {
var boundsHigh = _.get(element, this.sortField) > this.lastBounds.end;
var boundsLow = _.get(element, this.sortField) < this.lastBounds.start;
if (!boundsHigh && !boundsLow) {
array = this.telemetry;
} else if (boundsHigh) {
array = this.highBuffer;
}
} else {
array = this.highBuffer;
}
if (!isDuplicate) {
this.telemetry.splice(startIx, 0, element);
// If out of bounds low, disregard data
if (!boundsLow) {
// Going to check for duplicates. Bound the search problem to
// elements around the given time. Use sortedIndex because it
// employs a binary search which is O(log n). Can use binary search
// based on time stamp because the array is guaranteed ordered due
// to sorted insertion.
if (this.inBounds(element)) {
// If new element is within bounds, then the index within the
// master of the last element in bounds has just increased by one.
this.lastEndIndex++;
//If the new element is within bounds, add it immediately
this.emit('added', [element]);
var startIx = _.sortedIndex(array, element, this.sortField);
if (startIx !== array.length) {
var endIx = _.sortedLastIndex(array, element, this.sortField);
// Create an array of potential dupes, based on having the
// same time stamp
var potentialDupes = array.slice(startIx, endIx + 1);
// Search potential dupes for exact dupe
isDuplicate = _.findIndex(potentialDupes, _.isEqual.bind(undefined, element)) > -1;
}
if (!isDuplicate) {
array.splice(startIx, 0, element);
//Return true if it was added and in bounds
return array === this.telemetry;
}
}
return false;
};
TelemetryCollection.prototype.addAll = function (elements) {
var added = elements.filter(this.addOne);
this.emit('added', added);
};
//Todo: addAll for initial historical data
TelemetryCollection.prototype.add = function (element) {
if (this.addOne(element)){
this.emit('added', [element]);
return true;
} else {
return false;
}
};
TelemetryCollection.prototype.clear = function () {
this.telemetry = undefined;
this.telemetry = [];
};
TelemetryCollection.prototype.sort = function (sortField){

View File

@ -31,32 +31,32 @@ define(
//Bind all class functions to 'this'
_.bindAll(this, [
'destroyConductorListeners',
'changeTimeSystem',
'scrollToBottom',
'addRow',
'removeRows',
'onScroll',
'firstVisible',
'lastVisible',
'setVisibleRows',
'setHeaders',
'setElementSizes',
'addRows',
'binarySearch',
'insertSorted',
'sortComparator',
'sortRows',
'buildLargestRow',
'resize',
'filterAndSort',
'setRows',
'filterRows',
'scrollToRow',
'setTimeOfInterestRow',
'changeTimeOfInterest',
'changeBounds',
'changeTimeOfInterest',
'changeTimeSystem',
'destroyConductorListeners',
'digest',
'filterAndSort',
'filterRows',
'firstVisible',
'insertSorted',
'lastVisible',
'onRowClick',
'digest'
'onScroll',
'removeRows',
'resize',
'scrollToBottom',
'scrollToRow',
'setElementSizes',
'setHeaders',
'setRows',
'setTimeOfInterestRow',
'setVisibleRows',
'sortComparator',
'sortRows'
]);
this.scrollable.on('scroll', this.onScroll);
@ -125,7 +125,7 @@ define(
/*
* Listen for rows added individually (eg. for real-time tables)
*/
$scope.$on('add:row', this.addRow);
$scope.$on('add:rows', this.addRows);
$scope.$on('remove:rows', this.removeRows);
/**
@ -199,16 +199,13 @@ define(
* `add:row` broadcast event.
* @private
*/
MCTTableController.prototype.addRow = function (event, rowIndex) {
var row = this.$scope.rows[rowIndex];
MCTTableController.prototype.addRows = function (event, rows) {
//Does the row pass the current filter?
if (this.filterRows([row]).length === 1) {
//Insert the row into the correct position in the array
this.insertSorted(this.$scope.displayRows, row);
if (this.filterRows(rows).length > 0) {
rows.forEach(this.insertSorted.bind(this, this.$scope.displayRows));
//Resize the columns , then update the rows visible in the table
this.resize([this.$scope.sizingRow, row])
this.resize([this.$scope.sizingRow].concat(rows))
.then(this.setVisibleRows)
.then(function () {
if (this.$scope.autoScroll) {
@ -220,7 +217,6 @@ define(
if (toi !== -1) {
this.setTimeOfInterestRow(toi);
}
}
};

View File

@ -79,7 +79,9 @@ define(
'getHistoricalData',
'subscribeToNewData',
'changeBounds',
'setScroll'
'setScroll',
'addRowsToTable',
'removeRowsFromTable',
]);
this.getData();
@ -88,6 +90,9 @@ define(
this.openmct.conductor.on('follow', this.setScroll);
this.setScroll(this.openmct.conductor.follow());
this.telemetry.on('added', this.addRowsToTable);
this.telemetry.on('discarded', this.removeRowsFromTable);
this.$scope.$on("$destroy", this.destroy);
}
@ -102,16 +107,19 @@ define(
*/
TelemetryTableController.prototype.sortByTimeSystem = function (timeSystem) {
var scope = this.$scope;
var sortColumn = undefined;
scope.defaultSort = undefined;
if (timeSystem) {
this.table.columns.forEach(function (column) {
if (column.metadata.key === timeSystem.metadata.key) {
scope.defaultSort = column.getTitle();
sortColumn = column;
}
});
this.$scope.rows = _.sortBy(this.$scope.rows, function (row) {
return row[this.$scope.defaultSort];
});
if (sortColumn) {
scope.defaultSort = sortColumn.getTitle();
this.telemetry.sort(sortColumn.getTitle() + '.value');
}
}
};
@ -138,31 +146,23 @@ define(
this.openmct.conductor.on('bounds', this.changeBounds);
};
TelemetryTableController.prototype.addRowsToTable = function (rows) {
this.$scope.$broadcast('add:rows', rows);
};
TelemetryTableController.prototype.removeRowsFromTable = function (rows) {
this.$scope.$broadcast('remove:rows', rows);
};
TelemetryTableController.prototype.changeBounds = function (bounds) {
//console.log('bounds.end: ' + bounds.end);
var follow = this.openmct.conductor.follow();
var isTick = follow &&
bounds.start !== this.lastBounds.start &&
bounds.end !== this.lastBounds.end;
var isDeltaChange = follow &&
!isTick &&
(bounds.start !== this.lastBounds.start ||
bounds.end !== this.lastBounds.end);
var discarded = this.telemetry.bounds(bounds);
if (discarded.length > 0){
this.$scope.$broadcast('remove:rows', discarded);
}
if (isTick){
// Treat it as a realtime tick
// Drop old data that falls outside of bounds
//this.tick(bounds);
} else if (isDeltaChange){
// No idea...
// Historical query for bounds, then tick on
this.getData();
this.telemetry.bounds(bounds);
} else {
// Is fixed bounds change
this.getData();
@ -212,19 +212,23 @@ define(
var allColumns = telemetryApi.commonValuesForHints(metadatas, []);
this.table.populateColumns(allColumns);
this.timeColumns = telemetryApi.commonValuesForHints(metadatas, ['x']).map(function (metadatum) {
var domainColumns = telemetryApi.commonValuesForHints(metadatas, ['x']);
this.timeColumns = domainColumns.map(function (metadatum) {
return metadatum.name;
});
// For now, use first time field for time conductor
this.telemetry.sort(this.timeColumns[0] + '.value');
this.filterColumns();
var timeSystem = this.openmct.conductor.timeSystem();
if (timeSystem) {
this.sortByTimeSystem(timeSystem);
}
if (!this.telemetry.sortColumn && domainColumns.length > 0) {
this.telemetry.sort(domainColumns[0].name + '.value');
}
}
return objects;
};
@ -245,6 +249,7 @@ define(
return new Promise(function (resolve, reject){
function finishProcessing(){
telemetryCollection.addAll(rowData);
scope.rows = telemetryCollection.telemetry;
scope.loading = false;
resolve(scope.rows);
@ -258,11 +263,9 @@ define(
finishProcessing();
}
} else {
historicalData.slice(index, index + this.batchSize)
.forEach(function (datum) {
telemetryCollection.add(this.table.getRowValues(
limitEvaluator, datum));
}.bind(this));
rowData = rowData.concat(historicalData.slice(index, index + this.batchSize)
.map(this.table.getRowValues.bind(this.table, limitEvaluator)));
this.timeoutHandle = this.$timeout(processData.bind(
this,
historicalData,
@ -300,6 +303,7 @@ define(
}.bind(this));
};
/**
* @private
* @param objects
@ -348,6 +352,9 @@ define(
var scope = this.$scope;
var newObject = this.newObject;
this.telemetry.clear();
this.telemetry.bounds(this.openmct.conductor.bounds());
this.$scope.loading = true;
function error(e) {
@ -384,7 +391,7 @@ define(
getDomainObjects()
.then(filterForTelemetry)
.then(this.loadColumns)
.then(this.subscribeToNewData)
//.then(this.subscribeToNewData)
.then(this.getHistoricalData)
.catch(error)
};

View File

@ -465,20 +465,20 @@ define(
mockScope.displayRows = controller.sortRows(testRows.slice(0));
mockScope.rows.push(row4);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[0].col2.text).toEqual('xyz');
mockScope.rows.push(row5);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[4].col2.text).toEqual('aaa');
mockScope.rows.push(row6);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[2].col2.text).toEqual('ggg');
//Add a duplicate row
mockScope.rows.push(row6);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[2].col2.text).toEqual('ggg');
expect(mockScope.displayRows[3].col2.text).toEqual('ggg');
});
@ -494,12 +494,12 @@ define(
mockScope.displayRows = controller.filterRows(testRows);
mockScope.rows.push(row5);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows.length).toBe(2);
expect(mockScope.displayRows[1].col2.text).toEqual('aaa');
mockScope.rows.push(row6);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows.length).toBe(2);
//Row was not added because does not match filter
});
@ -513,11 +513,11 @@ define(
mockScope.displayRows = testRows.slice(0);
mockScope.rows.push(row5);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[3].col2.text).toEqual('aaa');
mockScope.rows.push(row6);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(mockScope.displayRows[4].col2.text).toEqual('ggg');
});
@ -536,7 +536,7 @@ define(
mockScope.displayRows = testRows.slice(0);
mockScope.rows.push(row7);
controller.addRow(undefined, mockScope.rows.length - 1);
controller.addRows(undefined, mockScope.rows.length - 1);
expect(controller.$scope.sizingRow.col2).toEqual({text: 'some longer string'});
});