[Plot] Use new API from PlotUpdater

Use PlotLine and PlotLineBuffer from PlotUpdater, to enable
merging of real-time and historical telemetry. WTD-806.
This commit is contained in:
Victor Woeltjen
2015-04-17 14:00:00 -07:00
parent 4d34f19aa2
commit 7370b1a87f
5 changed files with 255 additions and 224 deletions

View File

@ -6,67 +6,35 @@ define(
"use strict"; "use strict";
function PlotLine(initialSize, maxPoints) { function PlotLine(buffer) {
var buffer,
length = 0,
timeWindow;
// Binary search the buffer to find the index where // Insert a time-windowed data series into the buffer
// a point with this timestamp should be inserted. function insertSeriesWindow(seriesWindow) {
// After is a flag indicating whether it is preferred var count = seriesWindow.getPointCount();
// to insert after or before its nearest timestamp
function searchBuffer(timestamp, after) {
// Binary search for an appropriate insertion index.
function binSearch(min, max) {
var mid = Math.floor((min + max) / 2),
ts;
if (max < min) { function doInsert() {
return -1; var firstTimestamp = buffer.getDomainValue(0),
} lastTimestamp = buffer.getDomainValue(count - 1),
startIndex = buffer.findInsertionIndex(firstTimestamp),
endIndex = buffer.findInsertionIndex(lastTimestamp);
ts = buffer[mid * 2]; // Does the whole series fit in between two adjacent indexes?
if ((startIndex === endIndex) && startIndex > -1) {
// Check for an exact match... // Insert it in between
if (ts === timestamp) { buffer.insert(seriesWindow, startIndex);
// This is a case where we'll need to
// split up what we want to insert.
return mid + after ? -1 : 1;
} else { } else {
// Found our index? // Split it up, and add the two halves
if (max === min) { seriesWindow.split().forEach(insertSeriesWindow);
return max;
}
// Otherwise, search recursively
if (ts < timestamp) {
} else {
}
} }
} }
// Booleanize // Only insert if there are points to insert
after = !!after; if (count > 0) {
doInsert();
return binSearch(0, length - 1);
}
function insertSeriesWindow(seriesWindow) {
var startIndex = findStartIndex(),
endIndex = findEndIndex();
if (startIndex === endIndex) {
} else {
// Split it up, and add the two halves
seriesWindow.split().forEach(insertSeriesWindow);
} }
} }
function createWindow(series, domain, range) { function createWindow(series, domain, range) {
// TODO: Enforce time window, too!
return new PlotSeriesWindow( return new PlotSeriesWindow(
series, series,
domain, domain,
@ -77,8 +45,20 @@ define(
} }
return { return {
addData: function (domainValue, rangeValue) { getLineBuffer: function () {
// Should append to buffer return buffer;
},
addPoint: function (domainValue, rangeValue) {
var index = buffer.findInsertionIndex(domainValue);
if (index > -1) {
// Insert the point
if (!buffer.insertPoint(domainValue, rangeValue, index)) {
// If insertion failed, trim from the beginning...
buffer.trim(1);
// ...and try again.
buffer.insertPoint(domainValue, rangeValue, index);
}
}
}, },
addSeries: function (series, domain, range) { addSeries: function (series, domain, range) {
// Should try to add via insertion if a // Should try to add via insertion if a
@ -87,12 +67,6 @@ define(
// Insertion operation also needs to factor out // Insertion operation also needs to factor out
// redundant timestamps, for overlapping data // redundant timestamps, for overlapping data
insertSeriesWindow(createWindow(series, domain, range)); insertSeriesWindow(createWindow(series, domain, range));
},
setTimeWindow: function (start, end) {
timeWindow = [ start, end ];
},
clearTimeWindow: function () {
timeWindow = undefined;
} }
}; };
} }

View File

@ -14,6 +14,7 @@ define(
*/ */
function PlotLineBuffer(domainOffset, initialSize, maxSize) { function PlotLineBuffer(domainOffset, initialSize, maxSize) {
var buffer = new Float32Array(initialSize * 2), var buffer = new Float32Array(initialSize * 2),
rangeExtrema = [ Number.POSITIVE_INFINITY, Number.NEGATIVE_INFINITY ],
length = 0; length = 0;
// Binary search for an insertion index // Binary search for an insertion index
@ -68,6 +69,17 @@ define(
return canHalve; return canHalve;
} }
// Set a value in the buffer
function setValue(index, domainValue, rangeValue) {
buffer[index * 2] = domainValue - domainOffset;
buffer[index * 2 + 1] = rangeValue;
// Track min/max of range values (min/max for
// domain values can be read directly from buffer)
rangeExtrema = [
Math.min(rangeExtrema[0], rangeValue),
Math.max(rangeExtrema[1], rangeValue)
];
}
return { return {
/** /**
@ -84,6 +96,29 @@ define(
getLength: function () { getLength: function () {
return length; return length;
}, },
/**
* Get the min/max range values that are currently in this
* buffer. Unlike range extrema, these will change as the
* buffer gets trimmed.
* @returns {number[]} min, max domain values
*/
getDomainExtrema: function () {
// Since these are ordered in the buffer, assume
// these are the values at the first and last index
return [
buffer[0] + domainOffset,
buffer[length * 2 - 2] + domainOffset
];
},
/**
* Get the min/max range values that have been observed for this
* buffer. Note that these values may have been trimmed out at
* some point.
* @returns {number[]} min, max range values
*/
getRangeExtrema: function () {
return rangeExtrema;
},
/** /**
* Remove values from this buffer. * Remove values from this buffer.
* Normally, values are removed from the start * Normally, values are removed from the start
@ -119,14 +154,13 @@ define(
*/ */
insert: function (series, index) { insert: function (series, index) {
var sz = series.getPointCount(), var sz = series.getPointCount(),
free = (buffer.length / 2) - length,
i; i;
// Don't allow append after the end; that doesn't make sense // Don't allow append after the end; that doesn't make sense
index = Math.min(index, length); index = Math.min(index, length);
// Resize if necessary // Resize if necessary
if (sz > free) { while (sz > ((buffer.length / 2) - length)) {
if (!doubleBufferSize()) { if (!doubleBufferSize()) {
// Can't make room for this, insertion fails // Can't make room for this, insertion fails
return false; return false;
@ -143,10 +177,11 @@ define(
// Insert data into the set // Insert data into the set
for (i = 0; i < sz; i += 1) { for (i = 0; i < sz; i += 1) {
buffer[(i + index) * 2] = setValue(
series.getDomainValue(i) - domainOffset; i + index,
buffer[(i + index) * 2 + 1] = series.getDomainValue(i),
series.getRangeValue(i); series.getRangeValue(i)
);
} }
// Increase the length // Increase the length
@ -155,6 +190,29 @@ define(
// Indicate that insertion was successful // Indicate that insertion was successful
return true; return true;
}, },
/**
* Append a single data point.
*/
insertPoint: function (domainValue, rangeValue, index) {
// Don't allow
index = Math.min(length, index);
// Ensure there is space for this point
if (length >= (buffer.length / 2)) {
if (!doubleBufferSize()) {
return false;
}
}
// Put the data in the buffer
setValue(length, domainValue, rangeValue);
// Update length
length += 1;
// Indicate that this was successful
return true;
},
/** /**
* Find an index for inserting data with this * Find an index for inserting data with this
* timestamp. The second argument indicates whether * timestamp. The second argument indicates whether
@ -166,11 +224,15 @@ define(
* @returns {number} the index for insertion (or -1) * @returns {number} the index for insertion (or -1)
*/ */
findInsertionIndex: function (timestamp) { findInsertionIndex: function (timestamp) {
return binSearch( var value = timestamp - domainOffset;
timestamp - domainOffset,
0, // Handle empty buffer case and check for an
length - 1 // append opportunity (which is most common case for
); // real-time data so is optimized-for) before falling
// back to a binary search for the insertion point.
return (length < 1) ? 0 :
(value > buffer[length * 2 - 2]) ? length :
binSearch(value, 0, length - 1);
} }
}; };
} }

View File

@ -5,7 +5,8 @@
* the conversion from data API to displayable buffers. * the conversion from data API to displayable buffers.
*/ */
define( define(
function () { ['./PlotLine', './PlotLineBuffer'],
function (PlotLine, PlotLineBuffer) {
'use strict'; 'use strict';
var MAX_POINTS = 86400, var MAX_POINTS = 86400,
@ -17,164 +18,171 @@ define(
* Float32Array for each trace, and tracks the boundaries of the * Float32Array for each trace, and tracks the boundaries of the
* data sets (since this is convenient to do during the same pass). * data sets (since this is convenient to do during the same pass).
* @constructor * @constructor
* @param {Telemetry[]} datas telemetry data objects * @param {TelemetryHandle} handle the handle to telemetry access
* @param {string} domain the key to use when looking up domain values * @param {string} domain the key to use when looking up domain values
* @param {string} range the key to use when looking up range values * @param {string} range the key to use when looking up range values
*/ */
function PlotUpdater(subscription, domain, range, maxPoints) { function PlotUpdater(handle, domain, range, maxPoints) {
var max = [Number.NEGATIVE_INFINITY, Number.NEGATIVE_INFINITY], var ids = [],
min = [Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY], lines = {},
x, dimensions = [0, 0],
y, origin = [0, 0],
domainOffset, domainExtrema,
buffers = {}, rangeExtrema,
lengths = {}, bufferArray = [],
realtimeIndex = {}, domainOffset;
lengthArray = [],
bufferArray = [];
// Double the size of a Float32Array // Look up a domain object's id (for mapping, below)
function doubleSize(buffer) { function getId(domainObject) {
var doubled = new Float32Array(buffer.length * 2); return domainObject.getId();
doubled.set(buffer); // Copy contents of original
return doubled;
} }
// Make sure there is enough space in a buffer to accomodate a // Check if this set of ids matches the current set of ids
// new point at the specified index. This will updates buffers[id] // (used to detect if line preparation can be skipped)
// if necessary. function idsMatch(nextIds) {
function ensureBufferSize(buffer, id, index) { return nextIds.map(function (id, index) {
// Check if we don't have enough room return ids[index] === id;
if (index > (buffer.length / 2 - 1)) { }).reduce(function (a, b) {
// If we don't, can we expand? return a && b;
if (index < maxPoints) { }, true);
// Double the buffer size }
buffer = buffers[id] = doubleSize(buffer);
} else { // Prepare plot lines for this group of telemetry objects
// Just shift the existing buffer function prepareLines(telemetryObjects) {
buffer.set(buffer.subarray(2)); var nextIds = telemetryObjects.map(getId),
// Update the realtime index accordingly next = {};
realtimeIndex[id] = Math.max(realtimeIndex[id] - 1, 0);
} // Detect if we already have everything we need prepared
if (ids.length === nextIds.length && idsMatch(nextIds)) {
// Nothing to prepare, move on
return;
} }
return buffer; // Update list of ids in use
} ids = nextIds;
function setData(buffer, id, index, domainValue, rangeValue) { // Built up a set of ids. Note that we can only
// Make sure there's data to add, and then add it // create plot lines after our domain offset has
if (domainValue !== undefined && rangeValue !== undefined && // been determined.
(index < 1 || domainValue !== buffer[index * 2 - 2])) { if (domainOffset !== undefined) {
// Use the first observed domain value as a domainOffset bufferArray = ids.map(function (id) {
domainOffset = domainOffset !== undefined ? var buffer = new PlotLineBuffer(
domainOffset : domainValue; domainOffset,
// Ensure there is space for the new buffer INITIAL_SIZE,
buffer = ensureBufferSize(buffer, id, index); maxPoints
// Account for shifting that may have occurred );
index = Math.min(index, maxPoints - 1); next[id] = lines[id] || new PlotLine(buffer);
// Update the buffer return buffer;
buffer[index * 2] = domainValue - domainOffset; });
buffer[index * 2 + 1] = rangeValue;
// Update length
lengths[id] = Math.min(index + 1, maxPoints);
// Observe max/min range values
max[1] = Math.max(max[1], rangeValue);
min[1] = Math.min(min[1], rangeValue);
}
return buffer;
}
// Add data to the plot.
function addData(obj) {
var id = obj.getId(),
index = lengths[id] || 0,
buffer = buffers[id],
domainValue = subscription.getDomainValue(obj, domain),
rangeValue = subscription.getRangeValue(obj, range);
// If we don't already have a data buffer for that ID,
// make one.
if (!buffer) {
buffer = new Float32Array(INITIAL_SIZE);
buffers[id] = buffer;
} }
// Update the cutoff point for when we started receiving // If there are no more lines, clear the domain offset
// realtime data, to aid in clearing historical data later if (Object.keys(lines).length < 1) {
if (realtimeIndex[id] === undefined) { domainOffset = undefined;
realtimeIndex[id] = index;
} }
// Put the data in the buffer // Update to the current set of lines
return setData( lines = next;
buffer,
id,
index,
domainValue,
rangeValue
);
} }
// Update min/max domain values for these objects // Initialize the domain offset, based on these observed values
function updateDomainExtrema(objects) { function initializeDomainOffset(values) {
max[0] = Number.NEGATIVE_INFINITY; domainOffset =
min[0] = Number.POSITIVE_INFINITY; ((domainOffset === undefined) && (values.length > 0)) ?
objects.forEach(function (obj) { (values.reduce(function (a, b) {
var id = obj.getId(), return (a || 0) + (b || 0);
buffer = buffers[id], }, 0) / values.length) :
length = lengths[id], domainOffset;
low = buffer[0] + domainOffset,
high = buffer[length * 2 - 2] + domainOffset;
max[0] = Math.max(high, max[0]);
min[0] = Math.min(low, min[0]);
});
} }
// Update historical data for this domain object // Used in the reduce step of updateExtrema
function setHistorical(domainObject) { function reduceExtrema(a, b) {
var id = domainObject.getId(), return [ Math.min(a[0], b[0]), Math.max(a[1], b[1]) ];
// Buffer to expand }
buffer = buffers[id],
// Index where historical data ends (and realtime begins)
endIndex = realtimeIndex[id] || 0,
// Look up the data series
series = subscription.getSeries(domainObject),
// Get its length
seriesLength = series ? series.getPointCount() : 0,
// Get the current buffer length...
length = lengths[id] || 0,
// As well as the length of the realtime segment
realtimeLength = length - endIndex,
// Determine the new total length of the existing
// realtime + new historical segment.
totalLength =
Math.min(seriesLength + realtimeLength, maxPoints),
seriesFit = Math.max(0, totalLength - realtimeLength);
// Make sure the buffer is big enough // Convert a domain/range extrema to plot dimensions
function dimensionsOf(extrema) {
return extrema[1] - extrema[0];
}
// Move the realtime data into the correct position // Convert a domain/range extrema to a plot origin
function originOf(extrema) {
return extrema[0];
}
// Insert the historical data before it // Update dimensions and origin based on extrema of plots
function updateExtrema() {
domainExtrema = bufferArray.map(function (lineBuffer) {
return lineBuffer.getDomainExtrema();
}).reduce(reduceExtrema);
rangeExtrema = bufferArray.map(function (lineBuffer) {
return lineBuffer.getRangeExtrema();
}).reduce(reduceExtrema);
dimensions = (rangeExtrema[0] === rangeExtrema[1]) ?
[dimensionsOf(domainExtrema), 2.0 ] :
[dimensionsOf(domainExtrema), dimensionsOf(rangeExtrema)];
origin = [originOf(domainExtrema), originOf(rangeExtrema)];
} }
// Handle new telemetry data // Handle new telemetry data
function update() { function update() {
var objects = subscription.getTelemetryObjects(); var objects = handle.getTelemetryObjects();
bufferArray = objects.map(addData);
lengthArray = objects.map(function (obj) { // Initialize domain offset if necessary
return lengths[obj.getId()]; if (domainOffset === undefined) {
initializeDomainOffset(objects.map(function (obj) {
return handle.getDomainValue(obj, domain);
}));
}
// Make sure lines are available
prepareLines(objects);
// Add new data
objects.forEach(function (obj, index) {
lines[obj.getId()].addPoint(
handle.getDomainValue(obj, domain),
handle.getRangeValue(obj, range)
);
}); });
updateDomainExtrema(objects);
// Finally, update extrema
updateExtrema();
} }
// Prepare buffers and related state for this object // Add historical data for this domain object
function prepare(telemetryObject) { function setHistorical(domainObject, series) {
var id = telemetryObject.getId(); var count = series.getPointCount(),
lengths[id] = 0; line;
buffers[id] = new Float32Array(INITIAL_SIZE);
lengthArray.push(lengths[id]); // Nothing to do if it's an empty series
bufferArray.push(buffers[id]); if (count < 1) {
return;
}
// Initialize domain offset if necessary
if (domainOffset === undefined && series) {
initializeDomainOffset([
series.getDomainValue(0, domain),
series.getDomainValue(count - 1, domain)
]);
}
// Make sure lines are available
prepareLines(handle.getTelemetryObjects());
// Look up the line for this domain object
line = lines[domainObject.getId()];
// ...and put the data into it.
if (line) {
line.addSeries(series, domain, range);
}
// Finally, update extrema
updateExtrema();
} }
// Use a default MAX_POINTS if none is provided // Use a default MAX_POINTS if none is provided
@ -183,7 +191,7 @@ define(
// Initially prepare state for these objects. // Initially prepare state for these objects.
// Note that this may be an empty array at this time, // Note that this may be an empty array at this time,
// so we also need to check during update cycles. // so we also need to check during update cycles.
subscription.getTelemetryObjects().forEach(prepare); prepareLines(handle.getTelemetryObjects());
return { return {
/** /**
@ -193,10 +201,7 @@ define(
* @returns {number[]} the dimensions which bound this data set * @returns {number[]} the dimensions which bound this data set
*/ */
getDimensions: function () { getDimensions: function () {
// Pad range if necessary return dimensions;
return (max[1] === min[1]) ?
[max[0] - min[0], 2.0 ] :
[max[0] - min[0], max[1] - min[1]];
}, },
/** /**
* Get the origin of this data set's boundary. * Get the origin of this data set's boundary.
@ -207,7 +212,7 @@ define(
*/ */
getOrigin: function () { getOrigin: function () {
// Pad range if necessary // Pad range if necessary
return (max[1] === min[1]) ? [ min[0], min[1] - 1.0 ] : min; return origin;
}, },
/** /**
* Get the domain offset; this offset will have been subtracted * Get the domain offset; this offset will have been subtracted
@ -236,19 +241,9 @@ define(
* *
* @returns {Float32Array[]} the buffers for these traces * @returns {Float32Array[]} the buffers for these traces
*/ */
getBuffers: function () { getLineBuffers: function () {
return bufferArray; return bufferArray;
}, },
/**
* Get the number of points in the buffer with the specified
* index. Buffers are padded to minimize memory allocations,
* so user code will need this information to know how much
* data to plot.
* @returns {number} the number of points in this buffer
*/
getLength: function (index) {
return lengthArray[index] || 0;
},
/** /**
* Update with latest data. * Update with latest data.
*/ */

View File

@ -34,11 +34,11 @@ define(
subplot.setDomainOffset(prepared.getDomainOffset()); subplot.setDomainOffset(prepared.getDomainOffset());
// Draw the buffers. Select color by index. // Draw the buffers. Select color by index.
subplot.getDrawingObject().lines = prepared.getBuffers().map(function (buf, i) { subplot.getDrawingObject().lines = prepared.getLineBuffers().map(function (buf, i) {
return { return {
buffer: buf, buffer: buf.getBuffer(),
color: PlotPalette.getFloatColor(i), color: PlotPalette.getFloatColor(i),
points: prepared.getLength(i) points: buf.getLength()
}; };
}); });

View File

@ -23,7 +23,7 @@ define(
}); });
function plotTelemetryTo(subplot, prepared, index) { function plotTelemetryTo(subplot, prepared, index) {
var buffer = prepared.getBuffers()[index]; var buffer = prepared.getLineBuffers()[index];
// Track the domain offset, used to bias domain values // Track the domain offset, used to bias domain values
// to minimize loss of precision when converted to 32-bit // to minimize loss of precision when converted to 32-bit
@ -33,9 +33,9 @@ define(
// Draw the buffers. Always use the 0th color, because there // Draw the buffers. Always use the 0th color, because there
// is one line per plot. // is one line per plot.
subplot.getDrawingObject().lines = [{ subplot.getDrawingObject().lines = [{
buffer: buffer, buffer: buffer.getBuffer(),
color: PlotPalette.getFloatColor(0), color: PlotPalette.getFloatColor(0),
points: prepared.getLength(index) points: buffer.getLength()
}]; }];
subplot.update(); subplot.update();