diff --git a/src/api-binder/report.ts b/src/api-binder/report.ts index 76be1449..7c4f21dc 100644 --- a/src/api-binder/report.ts +++ b/src/api-binder/report.ts @@ -2,10 +2,10 @@ import * as url from 'url'; import * as _ from 'lodash'; import { delay } from 'bluebird'; import { CoreOptions } from 'request'; -import { performance } from 'perf_hooks'; import * as constants from '../lib/constants'; import { withBackoff, OnFailureInfo } from '../lib/backoff'; +import { throttle } from '../lib/throttle'; import { log } from '../lib/supervisor-console'; import { InternalInconsistencyError, StatusError } from '../lib/errors'; import { getRequestInstance } from '../lib/request'; @@ -19,7 +19,6 @@ import * as deviceState from '../device-state'; import { shallowDiff, prune, empty } from '../lib/json'; let lastReport: DeviceState = {}; -let lastReportTime: number = -Infinity; export let stateReportErrors = 0; type StateReportOpts = { @@ -64,30 +63,20 @@ async function report({ body, opts }: StateReport) { } } -async function reportCurrentState(opts: StateReportOpts) { +async function reportCurrentState(opts: StateReportOpts): Promise { // Wrap the report with fetching of state so report always has the latest state diff const getStateAndReport = async () => { - const now = performance.now(); - // Only try to report if enough time has elapsed since last report - if (now - lastReportTime >= constants.maxReportFrequency) { - const currentState = await deviceState.getCurrentForReport(lastReport); - const stateDiff = prune(shallowDiff(lastReport, currentState, 2)); + const currentState = await deviceState.getCurrentForReport(lastReport); + const stateDiff = prune(shallowDiff(lastReport, currentState, 2)); - if (empty(stateDiff)) { - return; - } - - await report({ body: stateDiff, opts }); - lastReportTime = performance.now(); - lastReport = currentState; - log.info('Reported current state to the cloud'); - } else { - // Not enough time has elapsed since last report - // Delay report until next allowed time - const timeSinceLastReport = now - lastReportTime; - await delay(constants.maxReportFrequency - timeSinceLastReport); - await getStateAndReport(); + if (empty(stateDiff)) { + return false; } + + await report({ body: stateDiff, opts }); + lastReport = currentState; + log.info('Reported current state to the cloud'); + return true; }; // Create a report that will backoff on errors @@ -97,13 +86,20 @@ async function reportCurrentState(opts: StateReportOpts) { onFailure: handleRetry, }); - // Run in try block to avoid throwing any exceptions + // Track if a report is actually sent since nothing + // is sent when nothing has changed from our last report + let sentReport = false; try { - await reportWithBackoff(); + sentReport = await reportWithBackoff(); stateReportErrors = 0; } catch (e) { + // The backoff will catch all errors and retry forever + // This error must have happened if something really unlikely happens + // such as an error in the backoff module log.error(e); } + + return sentReport; } function handleRetry(retryInfo: OnFailureInfo) { @@ -138,22 +134,15 @@ export async function startReporting() { 'appUpdatePollInterval', ])) as StateReportOpts; - let reportPending = false; - const doReport = async () => { - if (!reportPending) { - reportPending = true; - await reportCurrentState(reportConfigs); - reportPending = false; - } - }; - - // If the state changes, report it - deviceState.on('change', doReport); + // Throttle reporting only when a report was actually sent + const throttledReport = throttle(reportCurrentState, 1000, { + throttleOn: (didReport: boolean) => didReport, + }); async function recursivelyReport(delayBy: number) { try { // Try to send current state - await doReport(); + await throttledReport(reportConfigs); } finally { // Wait until we want to report again await delay(delayBy); @@ -162,7 +151,12 @@ export async function startReporting() { } } - // Start monitoring for changes that do not trigger deviceState events + // If the state changes, report it + deviceState.on('change', () => { + throttledReport(reportConfigs); + }); + + // Otherwise, start monitoring for changes that do not trigger deviceState events // Example - device metrics return recursivelyReport(constants.maxReportFrequency); } diff --git a/src/lib/throttle.ts b/src/lib/throttle.ts new file mode 100644 index 00000000..490938b6 --- /dev/null +++ b/src/lib/throttle.ts @@ -0,0 +1,42 @@ +import { strict as assert } from 'assert'; +import { performance } from 'perf_hooks'; + +export interface Options { + throttleOn: ((...args: any[]) => boolean) | null; +} + +const DEFAULT_OPTIONS: Partial = { + throttleOn: null, +}; + +export function throttle any>( + func: T, + delay: number, + options: Partial = {}, +) { + assert(typeof func === 'function', 'expected a function as parameter'); + // If the function returns a promise, unwrap the promise return type + // otherwise use the actual return + type TReturn = ReturnType extends Promise ? R : ReturnType; + + const normalizedOptions: Options = { + ...DEFAULT_OPTIONS, + ...options, + } as Options; + + let lastReportTime: number = -Infinity; + let cachedReturn: TReturn; + + return async function wrapped(...args: Parameters): Promise { + const now = performance.now(); + // Only call func if enough time has elapsed since the last time we called it + if (now - lastReportTime >= delay) { + cachedReturn = await func(...args); + lastReportTime = now; + return cachedReturn; + } else { + return cachedReturn; + } + // TODO: add a way to defer calls.... leading/trailing options + }; +}