mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-02-21 02:01:35 +00:00
Merge pull request #1411 from balena-io/extract-current-state-report
Extract current state reporting to its own module
This commit is contained in:
commit
3a16a796d8
@ -5,7 +5,7 @@ import * as express from 'express';
|
||||
import { isLeft } from 'fp-ts/lib/Either';
|
||||
import * as t from 'io-ts';
|
||||
import * as _ from 'lodash';
|
||||
import { PinejsClientRequest, StatusError } from 'pinejs-client-request';
|
||||
import { PinejsClientRequest } from 'pinejs-client-request';
|
||||
import * as url from 'url';
|
||||
import * as deviceRegister from './lib/register-device';
|
||||
|
||||
@ -14,7 +14,6 @@ import * as deviceConfig from './device-config';
|
||||
import * as eventTracker from './event-tracker';
|
||||
import { loadBackupFromMigration } from './lib/migration';
|
||||
|
||||
import constants = require('./lib/constants');
|
||||
import {
|
||||
ContractValidationError,
|
||||
ContractViolationError,
|
||||
@ -23,24 +22,15 @@ import {
|
||||
isHttpConflictError,
|
||||
} from './lib/errors';
|
||||
import * as request from './lib/request';
|
||||
import { DeviceStatus } from './types/state';
|
||||
|
||||
import log from './lib/supervisor-console';
|
||||
|
||||
import DeviceState from './device-state';
|
||||
import * as globalEventBus from './event-bus';
|
||||
import * as TargetState from './device-state/target-state';
|
||||
import * as CurrentState from './device-state/current-state';
|
||||
import * as logger from './logger';
|
||||
|
||||
// The exponential backoff starts at 15s
|
||||
const MINIMUM_BACKOFF_DELAY = 15000;
|
||||
|
||||
const INTERNAL_STATE_KEYS = [
|
||||
'update_pending',
|
||||
'update_downloaded',
|
||||
'update_failed',
|
||||
];
|
||||
|
||||
interface Device {
|
||||
id: number;
|
||||
|
||||
@ -66,16 +56,6 @@ export class APIBinder {
|
||||
private deviceState: DeviceState;
|
||||
|
||||
public balenaApi: PinejsClientRequest | null = null;
|
||||
private lastReportedState: DeviceStatus = {
|
||||
local: {},
|
||||
dependent: {},
|
||||
};
|
||||
private stateForReport: DeviceStatus = {
|
||||
local: {},
|
||||
dependent: {},
|
||||
};
|
||||
private reportPending = false;
|
||||
private stateReportErrors = 0;
|
||||
private readyForUpdates = false;
|
||||
|
||||
public constructor() {
|
||||
@ -125,7 +105,7 @@ export class APIBinder {
|
||||
const stateReportHealthy =
|
||||
!connectivityCheckEnabled ||
|
||||
!this.deviceState.connected ||
|
||||
this.stateReportErrors < 3;
|
||||
CurrentState.stateReportErrors < 3;
|
||||
|
||||
if (!stateReportHealthy) {
|
||||
log.info(
|
||||
@ -133,7 +113,7 @@ export class APIBinder {
|
||||
Healthcheck failure - Atleast ONE of the following conditions must be true:
|
||||
- No connectivityCheckEnabled ? ${!(connectivityCheckEnabled === true)}
|
||||
- device state is disconnected ? ${!(this.deviceState.connected === true)}
|
||||
- stateReportErrors less then 3 ? ${this.stateReportErrors < 3}`,
|
||||
- stateReportErrors less then 3 ? ${CurrentState.stateReportErrors < 3}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
@ -215,7 +195,7 @@ export class APIBinder {
|
||||
}
|
||||
|
||||
log.debug('Starting current state report');
|
||||
await this.startCurrentStateReport();
|
||||
await CurrentState.startReporting(this.deviceState);
|
||||
|
||||
// When we've provisioned, try to load the backup. We
|
||||
// must wait for the provisioning because we need a
|
||||
@ -356,22 +336,6 @@ export class APIBinder {
|
||||
).timeout(conf.apiTimeout)) as Device;
|
||||
}
|
||||
|
||||
public startCurrentStateReport() {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'Trying to start state reporting without initializing API client',
|
||||
);
|
||||
}
|
||||
this.deviceState.on('change', () => {
|
||||
if (!this.reportPending) {
|
||||
// A latency of 100ms should be acceptable and
|
||||
// allows avoiding catching docker at weird states
|
||||
this.reportCurrentState();
|
||||
}
|
||||
});
|
||||
return this.reportCurrentState();
|
||||
}
|
||||
|
||||
public async fetchDeviceTags(): Promise<DeviceTag[]> {
|
||||
if (this.balenaApi == null) {
|
||||
throw new Error(
|
||||
@ -411,160 +375,6 @@ export class APIBinder {
|
||||
});
|
||||
}
|
||||
|
||||
private getStateDiff(): DeviceStatus {
|
||||
const lastReportedLocal = this.lastReportedState.local;
|
||||
const lastReportedDependent = this.lastReportedState.dependent;
|
||||
if (lastReportedLocal == null || lastReportedDependent == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No local or dependent component of lastReportedLocal in ApiBinder.getStateDiff: ${JSON.stringify(
|
||||
this.lastReportedState,
|
||||
)}`,
|
||||
);
|
||||
}
|
||||
|
||||
const diff = {
|
||||
local: _(this.stateForReport.local)
|
||||
.omitBy((val, key: keyof DeviceStatus['local']) =>
|
||||
_.isEqual(lastReportedLocal[key], val),
|
||||
)
|
||||
.omit(INTERNAL_STATE_KEYS)
|
||||
.value(),
|
||||
dependent: _(this.stateForReport.dependent)
|
||||
.omitBy((val, key: keyof DeviceStatus['dependent']) =>
|
||||
_.isEqual(lastReportedDependent[key], val),
|
||||
)
|
||||
.omit(INTERNAL_STATE_KEYS)
|
||||
.value(),
|
||||
};
|
||||
|
||||
return _.omitBy(diff, _.isEmpty);
|
||||
}
|
||||
|
||||
private async sendReportPatch(
|
||||
stateDiff: DeviceStatus,
|
||||
conf: { apiEndpoint: string; uuid: string; localMode: boolean },
|
||||
) {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'Attempt to send report patch without an API client',
|
||||
);
|
||||
}
|
||||
|
||||
let body = stateDiff;
|
||||
if (conf.localMode) {
|
||||
body = this.stripDeviceStateInLocalMode(stateDiff);
|
||||
// In local mode, check if it still makes sense to send any updates after data strip.
|
||||
if (_.isEmpty(body.local)) {
|
||||
// Nothing to send.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const endpoint = url.resolve(
|
||||
conf.apiEndpoint,
|
||||
`/device/v2/${conf.uuid}/state`,
|
||||
);
|
||||
|
||||
const requestParams = _.extend(
|
||||
{
|
||||
method: 'PATCH',
|
||||
url: endpoint,
|
||||
body,
|
||||
},
|
||||
this.balenaApi.passthrough,
|
||||
);
|
||||
|
||||
await this.balenaApi._request(requestParams);
|
||||
}
|
||||
|
||||
// Returns an object that contains only status fields relevant for the local mode.
|
||||
// It basically removes information about applications state.
|
||||
public stripDeviceStateInLocalMode(state: DeviceStatus): DeviceStatus {
|
||||
return {
|
||||
local: _.cloneDeep(
|
||||
_.omit(state.local, 'apps', 'is_on__commit', 'logs_channel'),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
private report = _.throttle(async () => {
|
||||
const conf = await config.getMany([
|
||||
'deviceId',
|
||||
'apiTimeout',
|
||||
'apiEndpoint',
|
||||
'uuid',
|
||||
'localMode',
|
||||
]);
|
||||
|
||||
const stateDiff = this.getStateDiff();
|
||||
if (_.size(stateDiff) === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const { apiEndpoint, uuid, localMode } = conf;
|
||||
if (uuid == null || apiEndpoint == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'No uuid or apiEndpoint provided to ApiBinder.report',
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
await Bluebird.resolve(
|
||||
this.sendReportPatch(stateDiff, { apiEndpoint, uuid, localMode }),
|
||||
).timeout(conf.apiTimeout);
|
||||
|
||||
this.stateReportErrors = 0;
|
||||
_.assign(this.lastReportedState.local, stateDiff.local);
|
||||
_.assign(this.lastReportedState.dependent, stateDiff.dependent);
|
||||
} catch (e) {
|
||||
if (e instanceof StatusError) {
|
||||
// We don't want this to be classed as a report error, as this will cause
|
||||
// the watchdog to kill the supervisor - and killing the supervisor will
|
||||
// not help in this situation
|
||||
log.error(
|
||||
`Non-200 response from the API! Status code: ${e.statusCode} - message:`,
|
||||
e,
|
||||
);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}, constants.maxReportFrequency);
|
||||
|
||||
private reportCurrentState(): null {
|
||||
(async () => {
|
||||
this.reportPending = true;
|
||||
try {
|
||||
const currentDeviceState = await this.deviceState.getStatus();
|
||||
_.assign(this.stateForReport.local, currentDeviceState.local);
|
||||
_.assign(this.stateForReport.dependent, currentDeviceState.dependent);
|
||||
|
||||
const stateDiff = this.getStateDiff();
|
||||
if (_.size(stateDiff) === 0) {
|
||||
this.reportPending = false;
|
||||
return null;
|
||||
}
|
||||
|
||||
await this.report();
|
||||
this.reportCurrentState();
|
||||
} catch (e) {
|
||||
eventTracker.track('Device state report failure', { error: e });
|
||||
// We use the poll interval as the upper limit of
|
||||
// the exponential backoff
|
||||
const maxDelay = await config.get('appUpdatePollInterval');
|
||||
const delay = Math.min(
|
||||
2 ** this.stateReportErrors * MINIMUM_BACKOFF_DELAY,
|
||||
maxDelay,
|
||||
);
|
||||
|
||||
++this.stateReportErrors;
|
||||
await Bluebird.delay(delay);
|
||||
this.reportCurrentState();
|
||||
}
|
||||
})();
|
||||
return null;
|
||||
}
|
||||
|
||||
private async pinDevice({ app, commit }: DevicePinInfo) {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
|
214
src/device-state/current-state.ts
Normal file
214
src/device-state/current-state.ts
Normal file
@ -0,0 +1,214 @@
|
||||
import Bluebird = require('bluebird');
|
||||
import constants = require('../lib/constants');
|
||||
import log from '../lib/supervisor-console';
|
||||
import * as _ from 'lodash';
|
||||
import { InternalInconsistencyError } from '../lib/errors';
|
||||
import { DeviceStatus } from '../types/state';
|
||||
import { getRequestInstance } from '../lib/request';
|
||||
import * as config from '../config';
|
||||
import * as eventTracker from '../event-tracker';
|
||||
import DeviceState from '../device-state';
|
||||
import { CoreOptions } from 'request';
|
||||
import * as url from 'url';
|
||||
|
||||
// The exponential backoff starts at 15s
|
||||
const MINIMUM_BACKOFF_DELAY = 15000;
|
||||
|
||||
const INTERNAL_STATE_KEYS = [
|
||||
'update_pending',
|
||||
'update_downloaded',
|
||||
'update_failed',
|
||||
];
|
||||
|
||||
let deviceState: DeviceState;
|
||||
export let stateReportErrors = 0;
|
||||
const lastReportedState: DeviceStatus = {
|
||||
local: {},
|
||||
dependent: {},
|
||||
};
|
||||
const stateForReport: DeviceStatus = {
|
||||
local: {},
|
||||
dependent: {},
|
||||
};
|
||||
let reportPending = false;
|
||||
|
||||
class StatusError extends Error {
|
||||
constructor(public statusCode: number) {
|
||||
super();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an object that contains only status fields relevant for the local mode.
|
||||
* It basically removes information about applications state.
|
||||
*
|
||||
* Exported for tests
|
||||
*/
|
||||
export const stripDeviceStateInLocalMode = (
|
||||
state: DeviceStatus,
|
||||
): DeviceStatus => {
|
||||
return {
|
||||
local: _.cloneDeep(
|
||||
_.omit(state.local, 'apps', 'is_on__commit', 'logs_channel'),
|
||||
),
|
||||
};
|
||||
};
|
||||
|
||||
const sendReportPatch = async (
|
||||
stateDiff: DeviceStatus,
|
||||
conf: { apiEndpoint: string; uuid: string; localMode: boolean },
|
||||
) => {
|
||||
let body = stateDiff;
|
||||
if (conf.localMode) {
|
||||
body = stripDeviceStateInLocalMode(stateDiff);
|
||||
// In local mode, check if it still makes sense to send any updates after data strip.
|
||||
if (_.isEmpty(body.local)) {
|
||||
// Nothing to send.
|
||||
return;
|
||||
}
|
||||
}
|
||||
const { uuid, apiEndpoint, apiTimeout, deviceApiKey } = await config.getMany([
|
||||
'uuid',
|
||||
'apiEndpoint',
|
||||
'apiTimeout',
|
||||
'deviceApiKey',
|
||||
]);
|
||||
|
||||
const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`);
|
||||
const request = await getRequestInstance();
|
||||
|
||||
const params: CoreOptions = {
|
||||
json: true,
|
||||
headers: {
|
||||
Authorization: `Bearer ${deviceApiKey}`,
|
||||
},
|
||||
body,
|
||||
};
|
||||
|
||||
const [{ statusCode }] = await request
|
||||
.patchAsync(endpoint, params)
|
||||
.timeout(apiTimeout);
|
||||
|
||||
if (statusCode < 200 || statusCode >= 300) {
|
||||
log.error(`Error from the API: ${statusCode}`);
|
||||
throw new StatusError(statusCode);
|
||||
}
|
||||
};
|
||||
|
||||
const getStateDiff = (): DeviceStatus => {
|
||||
const lastReportedLocal = lastReportedState.local;
|
||||
const lastReportedDependent = lastReportedState.dependent;
|
||||
if (lastReportedLocal == null || lastReportedDependent == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No local or dependent component of lastReportedLocal in ApiBinder.getStateDiff: ${JSON.stringify(
|
||||
lastReportedState,
|
||||
)}`,
|
||||
);
|
||||
}
|
||||
|
||||
const diff = {
|
||||
local: _(stateForReport.local)
|
||||
.omitBy((val, key: keyof DeviceStatus['local']) =>
|
||||
_.isEqual(lastReportedLocal[key], val),
|
||||
)
|
||||
.omit(INTERNAL_STATE_KEYS)
|
||||
.value(),
|
||||
dependent: _(stateForReport.dependent)
|
||||
.omitBy((val, key: keyof DeviceStatus['dependent']) =>
|
||||
_.isEqual(lastReportedDependent[key], val),
|
||||
)
|
||||
.omit(INTERNAL_STATE_KEYS)
|
||||
.value(),
|
||||
};
|
||||
|
||||
return _.omitBy(diff, _.isEmpty);
|
||||
};
|
||||
|
||||
const report = _.throttle(async () => {
|
||||
const conf = await config.getMany([
|
||||
'deviceId',
|
||||
'apiTimeout',
|
||||
'apiEndpoint',
|
||||
'uuid',
|
||||
'localMode',
|
||||
]);
|
||||
|
||||
const stateDiff = getStateDiff();
|
||||
if (_.size(stateDiff) === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const { apiEndpoint, uuid, localMode } = conf;
|
||||
if (uuid == null || apiEndpoint == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'No uuid or apiEndpoint provided to ApiBinder.report',
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
await sendReportPatch(stateDiff, { apiEndpoint, uuid, localMode });
|
||||
|
||||
stateReportErrors = 0;
|
||||
_.assign(lastReportedState.local, stateDiff.local);
|
||||
_.assign(lastReportedState.dependent, stateDiff.dependent);
|
||||
} catch (e) {
|
||||
if (e instanceof StatusError) {
|
||||
// We don't want this to be classed as a report error, as this will cause
|
||||
// the watchdog to kill the supervisor - and killing the supervisor will
|
||||
// not help in this situation
|
||||
log.error(
|
||||
`Non-200 response from the API! Status code: ${e.statusCode} - message:`,
|
||||
e,
|
||||
);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}, constants.maxReportFrequency);
|
||||
|
||||
const reportCurrentState = (): null => {
|
||||
(async () => {
|
||||
reportPending = true;
|
||||
try {
|
||||
const currentDeviceState = await deviceState.getStatus();
|
||||
_.assign(stateForReport.local, currentDeviceState.local);
|
||||
_.assign(stateForReport.dependent, currentDeviceState.dependent);
|
||||
|
||||
const stateDiff = getStateDiff();
|
||||
if (_.size(stateDiff) === 0) {
|
||||
reportPending = false;
|
||||
return null;
|
||||
}
|
||||
|
||||
await report();
|
||||
reportCurrentState();
|
||||
} catch (e) {
|
||||
eventTracker.track('Device state report failure', { error: e });
|
||||
// We use the poll interval as the upper limit of
|
||||
// the exponential backoff
|
||||
const maxDelay = await config.get('appUpdatePollInterval');
|
||||
const delay = Math.min(
|
||||
2 ** stateReportErrors * MINIMUM_BACKOFF_DELAY,
|
||||
maxDelay,
|
||||
);
|
||||
|
||||
++stateReportErrors;
|
||||
await Bluebird.delay(delay);
|
||||
reportCurrentState();
|
||||
}
|
||||
})();
|
||||
return null;
|
||||
};
|
||||
|
||||
// TODO: Remove the passing in of deviceState once it's a singleton
|
||||
export const startReporting = ($deviceState: typeof deviceState) => {
|
||||
deviceState = $deviceState;
|
||||
deviceState.on('change', () => {
|
||||
if (!reportPending) {
|
||||
// A latency of 100ms should be acceptable and
|
||||
// allows avoiding catching docker at weird states
|
||||
reportCurrentState();
|
||||
}
|
||||
});
|
||||
return reportCurrentState();
|
||||
};
|
@ -30,6 +30,10 @@ type PromisifiedRequest = typeof requestLib & {
|
||||
uri: string | requestLib.CoreOptions,
|
||||
options?: requestLib.CoreOptions | undefined,
|
||||
) => Bluebird<[requestLib.Response, any]>;
|
||||
patchAsync: (
|
||||
uri: string | requestLib.CoreOptions,
|
||||
options?: requestLib.CoreOptions | undefined,
|
||||
) => Bluebird<[requestLib.Response, any]>;
|
||||
getAsync: (
|
||||
uri: string | requestLib.CoreOptions,
|
||||
options?: requestLib.CoreOptions | undefined,
|
||||
|
@ -15,6 +15,7 @@ import { schema } from '../src/config/schema';
|
||||
import ConfigJsonConfigBackend from '../src/config/configJson';
|
||||
import * as TargetState from '../src/device-state/target-state';
|
||||
import { DeviceStatus } from '../src/types/state';
|
||||
import * as CurrentState from '../src/device-state/current-state';
|
||||
|
||||
const { expect } = chai;
|
||||
|
||||
@ -330,7 +331,7 @@ describe('ApiBinder', () => {
|
||||
} as DeviceStatus;
|
||||
|
||||
it('should strip applications data', () => {
|
||||
const result = components.apiBinder.stripDeviceStateInLocalMode(
|
||||
const result = CurrentState.stripDeviceStateInLocalMode(
|
||||
sampleState,
|
||||
) as Dictionary<any>;
|
||||
expect(result).to.not.have.property('dependent');
|
||||
@ -442,7 +443,8 @@ describe('ApiBinder', () => {
|
||||
const previousDeviceStateConnected =
|
||||
components.apiBinder.deviceState.connected;
|
||||
// Set additional conditions not in configStub to cause a fail
|
||||
components.apiBinder.stateReportErrors = 4;
|
||||
// @ts-expect-error
|
||||
CurrentState.stateReportErrors = 4;
|
||||
components.apiBinder.deviceState.connected = true;
|
||||
expect(await components.apiBinder.healthcheck()).to.equal(false);
|
||||
expect(Log.info).to.be.calledOnce;
|
||||
@ -454,7 +456,8 @@ describe('ApiBinder', () => {
|
||||
- stateReportErrors less then 3 ? false`,
|
||||
);
|
||||
// Restore previous values
|
||||
components.apiBinder.stateReportErrors = previousStateReportErrors;
|
||||
// @ts-expect-error
|
||||
CurrentState.stateReportErrors = previousStateReportErrors;
|
||||
components.apiBinder.deviceState.connected = previousDeviceStateConnected;
|
||||
});
|
||||
});
|
||||
|
Loading…
x
Reference in New Issue
Block a user