Use throttle for reporting state

This ensures that a trailing call is executed as well
as ensuring the throttle function is only ran within the allowed
frequency

Closes: #1945
Change-type: minor
Signed-off-by: 20k-ultra <3946250+20k-ultra@users.noreply.github.com>
This commit is contained in:
20k-ultra 2022-06-06 17:33:28 -04:00
parent d9186649a6
commit 40b809571c
2 changed files with 73 additions and 37 deletions

View File

@ -2,10 +2,10 @@ import * as url from 'url';
import * as _ from 'lodash';
import { delay } from 'bluebird';
import { CoreOptions } from 'request';
import { performance } from 'perf_hooks';
import * as constants from '../lib/constants';
import { withBackoff, OnFailureInfo } from '../lib/backoff';
import { throttle } from '../lib/throttle';
import { log } from '../lib/supervisor-console';
import { InternalInconsistencyError, StatusError } from '../lib/errors';
import { getRequestInstance } from '../lib/request';
@ -19,7 +19,6 @@ import * as deviceState from '../device-state';
import { shallowDiff, prune, empty } from '../lib/json';
let lastReport: DeviceState = {};
let lastReportTime: number = -Infinity;
export let stateReportErrors = 0;
type StateReportOpts = {
@ -64,30 +63,20 @@ async function report({ body, opts }: StateReport) {
}
}
async function reportCurrentState(opts: StateReportOpts) {
async function reportCurrentState(opts: StateReportOpts): Promise<boolean> {
// Wrap the report with fetching of state so report always has the latest state diff
const getStateAndReport = async () => {
const now = performance.now();
// Only try to report if enough time has elapsed since last report
if (now - lastReportTime >= constants.maxReportFrequency) {
const currentState = await deviceState.getCurrentForReport(lastReport);
const stateDiff = prune(shallowDiff(lastReport, currentState, 2));
const currentState = await deviceState.getCurrentForReport(lastReport);
const stateDiff = prune(shallowDiff(lastReport, currentState, 2));
if (empty(stateDiff)) {
return;
}
await report({ body: stateDiff, opts });
lastReportTime = performance.now();
lastReport = currentState;
log.info('Reported current state to the cloud');
} else {
// Not enough time has elapsed since last report
// Delay report until next allowed time
const timeSinceLastReport = now - lastReportTime;
await delay(constants.maxReportFrequency - timeSinceLastReport);
await getStateAndReport();
if (empty(stateDiff)) {
return false;
}
await report({ body: stateDiff, opts });
lastReport = currentState;
log.info('Reported current state to the cloud');
return true;
};
// Create a report that will backoff on errors
@ -97,13 +86,20 @@ async function reportCurrentState(opts: StateReportOpts) {
onFailure: handleRetry,
});
// Run in try block to avoid throwing any exceptions
// Track if a report is actually sent since nothing
// is sent when nothing has changed from our last report
let sentReport = false;
try {
await reportWithBackoff();
sentReport = await reportWithBackoff();
stateReportErrors = 0;
} catch (e) {
// The backoff will catch all errors and retry forever
// This error must have happened if something really unlikely happens
// such as an error in the backoff module
log.error(e);
}
return sentReport;
}
function handleRetry(retryInfo: OnFailureInfo) {
@ -138,22 +134,15 @@ export async function startReporting() {
'appUpdatePollInterval',
])) as StateReportOpts;
let reportPending = false;
const doReport = async () => {
if (!reportPending) {
reportPending = true;
await reportCurrentState(reportConfigs);
reportPending = false;
}
};
// If the state changes, report it
deviceState.on('change', doReport);
// Throttle reporting only when a report was actually sent
const throttledReport = throttle(reportCurrentState, 1000, {
throttleOn: (didReport: boolean) => didReport,
});
async function recursivelyReport(delayBy: number) {
try {
// Try to send current state
await doReport();
await throttledReport(reportConfigs);
} finally {
// Wait until we want to report again
await delay(delayBy);
@ -162,7 +151,12 @@ export async function startReporting() {
}
}
// Start monitoring for changes that do not trigger deviceState events
// If the state changes, report it
deviceState.on('change', () => {
throttledReport(reportConfigs);
});
// Otherwise, start monitoring for changes that do not trigger deviceState events
// Example - device metrics
return recursivelyReport(constants.maxReportFrequency);
}

42
src/lib/throttle.ts Normal file
View File

@ -0,0 +1,42 @@
import { strict as assert } from 'assert';
import { performance } from 'perf_hooks';
export interface Options {
throttleOn: ((...args: any[]) => boolean) | null;
}
const DEFAULT_OPTIONS: Partial<Options> = {
throttleOn: null,
};
export function throttle<T extends (...args: any[]) => any>(
func: T,
delay: number,
options: Partial<Options> = {},
) {
assert(typeof func === 'function', 'expected a function as parameter');
// If the function returns a promise, unwrap the promise return type
// otherwise use the actual return
type TReturn = ReturnType<T> extends Promise<infer R> ? R : ReturnType<T>;
const normalizedOptions: Options = {
...DEFAULT_OPTIONS,
...options,
} as Options;
let lastReportTime: number = -Infinity;
let cachedReturn: TReturn;
return async function wrapped(...args: Parameters<T>): Promise<TReturn> {
const now = performance.now();
// Only call func if enough time has elapsed since the last time we called it
if (now - lastReportTime >= delay) {
cachedReturn = await func(...args);
lastReportTime = now;
return cachedReturn;
} else {
return cachedReturn;
}
// TODO: add a way to defer calls.... leading/trailing options
};
}