Use throttle module for reporting current state

This simplifies the logic of throttling reports plus adds the benefit
of allowing reports requested during a pending report to still be called
after the pending report completes.

Closes: #1945
Change-type: minor
Signed-off-by: 20k-ultra <3946250+20k-ultra@users.noreply.github.com>
This commit is contained in:
20k-ultra 2022-06-14 17:08:45 -04:00
parent 51c5456af9
commit e28b3be33d

View File

@ -2,10 +2,10 @@ import * as url from 'url';
import * as _ from 'lodash'; import * as _ from 'lodash';
import { delay } from 'bluebird'; import { delay } from 'bluebird';
import { CoreOptions } from 'request'; import { CoreOptions } from 'request';
import { performance } from 'perf_hooks';
import * as constants from '../lib/constants'; import * as constants from '../lib/constants';
import { withBackoff, OnFailureInfo } from '../lib/backoff'; import { withBackoff, OnFailureInfo } from '../lib/backoff';
import throttle from '../lib/throttle';
import { log } from '../lib/supervisor-console'; import { log } from '../lib/supervisor-console';
import { InternalInconsistencyError, StatusError } from '../lib/errors'; import { InternalInconsistencyError, StatusError } from '../lib/errors';
import { getRequestInstance } from '../lib/request'; import { getRequestInstance } from '../lib/request';
@ -19,7 +19,6 @@ import * as deviceState from '../device-state';
import { shallowDiff, prune, empty } from '../lib/json'; import { shallowDiff, prune, empty } from '../lib/json';
let lastReport: DeviceState = {}; let lastReport: DeviceState = {};
let lastReportTime: number = -Infinity;
export let stateReportErrors = 0; export let stateReportErrors = 0;
type StateReportOpts = { type StateReportOpts = {
@ -64,30 +63,46 @@ async function report({ body, opts }: StateReport) {
} }
} }
async function reportCurrentState(opts: StateReportOpts) { interface ReportResult {
reported: boolean;
state: StateDiff;
}
interface StateDiff {
full: DeviceState;
diff: Partial<DeviceState>;
}
// Cache state difference for 0.25 second to prevent excessive CPU usage
const calculateStateDiff = throttle(async (): Promise<StateDiff> => {
const currentState = await deviceState.getCurrentForReport(lastReport);
return {
full: currentState,
diff: prune(shallowDiff(lastReport, currentState, 2)),
};
}, 250);
async function reportCurrentState(
opts: StateReportOpts,
cb: (r: ReportResult) => void,
) {
// Wrap the report with fetching of state so report always has the latest state diff // Wrap the report with fetching of state so report always has the latest state diff
const getStateAndReport = async () => { const getStateAndReport = async () => {
const now = performance.now(); const reportPayload: ReportResult = {
// Only try to report if enough time has elapsed since last report reported: false,
if (now - lastReportTime >= constants.maxReportFrequency) { state: await calculateStateDiff(),
const currentState = await deviceState.getCurrentForReport(lastReport); };
const stateDiff = prune(shallowDiff(lastReport, currentState, 2));
if (empty(stateDiff)) { if (empty(reportPayload.state.diff)) {
return; return cb(reportPayload);
} }
await report({ body: stateDiff, opts }); await report({ body: reportPayload.state.diff, opts });
lastReportTime = performance.now();
lastReport = currentState; cb({
log.info('Reported current state to the cloud'); ...reportPayload,
} else { reported: true,
// Not enough time has elapsed since last report });
// Delay report until next allowed time
const timeSinceLastReport = now - lastReportTime;
await delay(constants.maxReportFrequency - timeSinceLastReport);
await getStateAndReport();
}
}; };
// Create a report that will backoff on errors // Create a report that will backoff on errors
@ -100,7 +115,6 @@ async function reportCurrentState(opts: StateReportOpts) {
// Run in try block to avoid throwing any exceptions // Run in try block to avoid throwing any exceptions
try { try {
await reportWithBackoff(); await reportWithBackoff();
stateReportErrors = 0;
} catch (e) { } catch (e) {
log.error(e); log.error(e);
} }
@ -138,13 +152,25 @@ export async function startReporting() {
'appUpdatePollInterval', 'appUpdatePollInterval',
])) as StateReportOpts; ])) as StateReportOpts;
let reportPending = false; const throttledReport = throttle(
reportCurrentState,
constants.maxReportFrequency,
);
const doReport = async () => { const doReport = async () => {
if (!reportPending) { return new Promise<void>((resolve) => {
reportPending = true; throttledReport(reportConfigs, (result: ReportResult) => {
await reportCurrentState(reportConfigs); if (result.reported) {
reportPending = false; log.info('Reported current state to the cloud');
stateReportErrors = 0;
lastReport = result.state.full;
} else {
// No report was sent to reset the throttle
throttledReport.cancel();
} }
resolve();
});
});
}; };
// If the state changes, report it // If the state changes, report it