[Telemetry] Add telemetry subscriptions

Add ability to subscribe/unsubscribe to streaming
telemetry updates. This may be more performant than
polling in some circumstances, and appears necessary
for good performance of autoflow tabular views,
WTD-614.
This commit is contained in:
Victor Woeltjen 2014-12-29 13:36:53 -08:00
parent 7db5c0692b
commit 8c55a66320
3 changed files with 99 additions and 10 deletions

View File

@ -13,6 +13,7 @@ define(
* @constructor
*/
function SinewaveTelemetryProvider($q, $timeout) {
var subscriptions = [];
//
function matchesSource(request) {
@ -43,8 +44,48 @@ define(
}, 0);
}
function handleSubscriptions() {
subscriptions.forEach(function (subscription) {
var requests = subscription.requests;
subscription.callback(doPackage(
requests.filter(matchesSource).map(generateData)
));
});
}
function startGenerating() {
$timeout(function () {
handleSubscriptions();
if (subscriptions.length > 0) {
startGenerating();
}
}, 1000);
}
function subscribe(callback, requests) {
var subscription = {
callback: callback,
requests: requests
};
function unsubscribe() {
subscriptions = subscriptions.filter(function (s) {
return s !== subscription;
});
}
subscriptions.push(subscription);
if (subscriptions.length === 1) {
startGenerating();
}
return unsubscribe;
}
return {
requestTelemetry: requestTelemetry
requestTelemetry: requestTelemetry,
subscribe: subscribe
};
}

View File

@ -38,6 +38,23 @@ define(
})).then(mergeResults);
}
// Subscribe to updates from all providers
function subscribe(callback, requests) {
var unsubscribes = telemetryProviders.map(function (provider) {
return provider.subscribe(callback, requests);
});
// Return an unsubscribe function that invokes unsubscribe
// for all providers.
return function () {
unsubscribes.forEach(function (unsubscribe) {
if (unsubscribe) {
unsubscribe();
}
});
};
}
return {
/**
* Request telemetry data.

View File

@ -16,20 +16,22 @@ define(
* @constructor
*/
function TelemetryCapability($injector, $q, $log, domainObject) {
var telemetryService;
var telemetryService,
subscriptions = [],
unsubscribeFunction;
// We could depend on telemetryService directly, but
// there isn't a platform implementation of this;
function getTelemetryService() {
if (!telemetryService) {
if (telemetryService === undefined) {
try {
telemetryService =
$q.when($injector.get("telemetryService"));
$injector.get("telemetryService");
} catch (e) {
// $injector should throw is telemetryService
// is unavailable or unsatisfiable.
$log.warn("Telemetry service unavailable");
telemetryService = $q.reject(e);
telemetryService = null;
}
}
return telemetryService;
@ -83,16 +85,34 @@ define(
}
// Issue a request to the service
function requestTelemetryFromService(telemetryService) {
function requestTelemetryFromService() {
return telemetryService.requestTelemetry([fullRequest]);
}
// If a telemetryService is not available,
// getTelemetryService() should reject, and this should
// bubble through subsequent then calls.
return getTelemetryService()
.then(requestTelemetryFromService)
.then(getRelevantResponse);
return getTelemetryService() &&
requestTelemetryFromService()
.then(getRelevantResponse);
}
// Listen for real-time and/or streaming updates
function subscribe(callback, request) {
var fullRequest = buildRequest(request || {});
// Unpack the relevant telemetry series
function update(telemetries) {
var source = fullRequest.source,
key = fullRequest.key,
result = ((telemetries || {})[source] || {})[key];
if (result) {
callback(result);
}
}
return getTelemetryService() &&
telemetryService.subscribe(update, [fullRequest]);
}
return {
@ -115,7 +135,18 @@ define(
// type-level and object-level telemetry
// properties
return buildRequest({});
}
},
/**
* Subscribe to updates to telemetry data for this domain
* object.
* @param {Function} callback a function to call when new
* data becomes available; the telemetry series
* containing the data will be given as an argument.
* @param {TelemetryRequest} [request] parameters for the
* subscription request
*/
subscribe: subscribe
};
}