mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2024-12-20 06:07:57 +00:00
Merge pull request #1332 from balena-io/isolate-target-state
Isolate target state
This commit is contained in:
commit
b0be6c1925
@ -22,14 +22,14 @@ import {
|
||||
isHttpConflictError,
|
||||
} from './lib/errors';
|
||||
import * as request from './lib/request';
|
||||
import { writeLock } from './lib/update-lock';
|
||||
import { DeviceStatus, TargetState } from './types/state';
|
||||
import { DeviceStatus } from './types/state';
|
||||
|
||||
import log from './lib/supervisor-console';
|
||||
|
||||
import DeviceState from './device-state';
|
||||
import * as globalEventBus from './event-bus';
|
||||
import Logger from './logger';
|
||||
import * as TargetState from './device-state/target-state';
|
||||
|
||||
// The exponential backoff starts at 15s
|
||||
const MINIMUM_BACKOFF_DELAY = 15000;
|
||||
@ -72,8 +72,6 @@ export class APIBinder {
|
||||
private logger: Logger;
|
||||
|
||||
public balenaApi: PinejsClientRequest | null = null;
|
||||
// TODO{type}: Retype me when all types are sorted
|
||||
private cachedBalenaApi: PinejsClientRequest | null = null;
|
||||
private lastReportedState: DeviceStatus = {
|
||||
local: {},
|
||||
dependent: {},
|
||||
@ -82,11 +80,8 @@ export class APIBinder {
|
||||
local: {},
|
||||
dependent: {},
|
||||
};
|
||||
private lastTarget: TargetState;
|
||||
private lastTargetStateFetch = process.hrtime();
|
||||
private reportPending = false;
|
||||
private stateReportErrors = 0;
|
||||
private targetStateFetchErrors = 0;
|
||||
private readyForUpdates = false;
|
||||
|
||||
public constructor({ eventTracker, logger }: APIBinderConstructOpts) {
|
||||
@ -124,7 +119,7 @@ export class APIBinder {
|
||||
}
|
||||
|
||||
// Check last time target state has been polled
|
||||
const timeSinceLastFetch = process.hrtime(this.lastTargetStateFetch);
|
||||
const timeSinceLastFetch = process.hrtime(TargetState.lastFetch);
|
||||
const timeSinceLastFetchMs =
|
||||
timeSinceLastFetch[0] * 1000 + timeSinceLastFetch[1] / 1e6;
|
||||
|
||||
@ -177,7 +172,6 @@ export class APIBinder {
|
||||
apiPrefix: baseUrl,
|
||||
passthrough,
|
||||
});
|
||||
this.cachedBalenaApi = this.balenaApi.clone({}, { cache: {} });
|
||||
}
|
||||
|
||||
public async start() {
|
||||
@ -232,7 +226,37 @@ export class APIBinder {
|
||||
|
||||
this.readyForUpdates = true;
|
||||
log.debug('Starting target state poll');
|
||||
this.startTargetStatePoll();
|
||||
TargetState.startPoll();
|
||||
TargetState.emitter.on(
|
||||
'target-state-update',
|
||||
async (targetState, force, isFromApi) => {
|
||||
try {
|
||||
await this.deviceState.setTarget(targetState);
|
||||
this.deviceState.triggerApplyTarget({ force, isFromApi });
|
||||
} catch (err) {
|
||||
if (
|
||||
err instanceof ContractValidationError ||
|
||||
err instanceof ContractViolationError
|
||||
) {
|
||||
log.error(`Could not store target state for device: ${err}`);
|
||||
// the dashboard does not display lines correctly,
|
||||
// split them explcitly here
|
||||
const lines = err.message.split(/\r?\n/);
|
||||
lines[0] = `Could not move to new release: ${lines[0]}`;
|
||||
for (const line of lines) {
|
||||
this.logger.logSystemMessage(
|
||||
line,
|
||||
{},
|
||||
'targetStateRejection',
|
||||
false,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
log.error(`Failed to get target state for device: ${err}`);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
public async fetchDevice(
|
||||
@ -333,57 +357,6 @@ export class APIBinder {
|
||||
.timeout(conf.apiTimeout)) as Device;
|
||||
}
|
||||
|
||||
public async getTargetState(): Promise<TargetState> {
|
||||
const { uuid, apiEndpoint, apiTimeout } = await config.getMany([
|
||||
'uuid',
|
||||
'apiEndpoint',
|
||||
'apiTimeout',
|
||||
]);
|
||||
|
||||
if (!_.isString(apiEndpoint)) {
|
||||
throw new InternalInconsistencyError(
|
||||
'Non-string apiEndpoint passed to ApiBinder.getTargetState',
|
||||
);
|
||||
}
|
||||
if (this.cachedBalenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'Attempt to get target state without an API client',
|
||||
);
|
||||
}
|
||||
|
||||
const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`);
|
||||
const requestParams = _.extend(
|
||||
{ method: 'GET', url: endpoint },
|
||||
this.cachedBalenaApi.passthrough,
|
||||
);
|
||||
|
||||
return (await this.cachedBalenaApi
|
||||
._request(requestParams)
|
||||
.timeout(apiTimeout)) as TargetState;
|
||||
}
|
||||
|
||||
// TODO: Once 100% typescript, change this to a native promise
|
||||
public startTargetStatePoll(): Bluebird<null> {
|
||||
return Bluebird.try(() => {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
'Trying to start poll without initializing API client',
|
||||
);
|
||||
}
|
||||
config
|
||||
.get('instantUpdates')
|
||||
.catch(() => {
|
||||
// Default to skipping the initial update if we couldn't fetch the setting
|
||||
// which should be the exceptional case
|
||||
return false;
|
||||
})
|
||||
.then((instantUpdates) => {
|
||||
this.pollTargetState(!instantUpdates);
|
||||
});
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public startCurrentStateReport() {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
@ -593,71 +566,6 @@ export class APIBinder {
|
||||
return null;
|
||||
}
|
||||
|
||||
private getAndSetTargetState(force: boolean, isFromApi = false) {
|
||||
return Bluebird.using(this.lockGetTarget(), async () => {
|
||||
const targetState = await this.getTargetState();
|
||||
if (isFromApi || !_.isEqual(targetState, this.lastTarget)) {
|
||||
await this.deviceState.setTarget(targetState);
|
||||
this.lastTarget = _.cloneDeep(targetState);
|
||||
this.deviceState.triggerApplyTarget({ force, isFromApi });
|
||||
}
|
||||
})
|
||||
.tapCatch(ContractValidationError, ContractViolationError, (e) => {
|
||||
log.error(`Could not store target state for device: ${e}`);
|
||||
// the dashboard does not display lines correctly,
|
||||
// split them explcitly here
|
||||
const lines = e.message.split(/\r?\n/);
|
||||
lines[0] = `Could not move to new release: ${lines[0]}`;
|
||||
for (const line of lines) {
|
||||
this.logger.logSystemMessage(line, {}, 'targetStateRejection', false);
|
||||
}
|
||||
})
|
||||
.tapCatch(
|
||||
(e: unknown) =>
|
||||
!(
|
||||
e instanceof ContractValidationError ||
|
||||
e instanceof ContractViolationError
|
||||
),
|
||||
(err) => {
|
||||
log.error(`Failed to get target state for device: ${err}`);
|
||||
},
|
||||
)
|
||||
.finally(() => {
|
||||
this.lastTargetStateFetch = process.hrtime();
|
||||
});
|
||||
}
|
||||
|
||||
private async pollTargetState(skipFirstGet: boolean = false): Promise<void> {
|
||||
let appUpdatePollInterval;
|
||||
try {
|
||||
appUpdatePollInterval = await config.get('appUpdatePollInterval');
|
||||
if (!skipFirstGet) {
|
||||
await this.getAndSetTargetState(false);
|
||||
this.targetStateFetchErrors = 0;
|
||||
}
|
||||
|
||||
// We add a random jitter up to `maxApiJitterDelay` to
|
||||
// space out poll requests
|
||||
const pollInterval =
|
||||
Math.random() * constants.maxApiJitterDelay + appUpdatePollInterval;
|
||||
|
||||
await Bluebird.delay(pollInterval);
|
||||
} catch (e) {
|
||||
const pollInterval = Math.min(
|
||||
// If we failed fetching the appUpdatePollInterval then there won't have been any network
|
||||
// requests so we default it to a low value of 10s since we don't need to worry about too
|
||||
// many network requests
|
||||
appUpdatePollInterval ?? 10000,
|
||||
15000 * 2 ** this.targetStateFetchErrors,
|
||||
);
|
||||
++this.targetStateFetchErrors;
|
||||
|
||||
await Bluebird.delay(pollInterval);
|
||||
} finally {
|
||||
await this.pollTargetState();
|
||||
}
|
||||
}
|
||||
|
||||
private async pinDevice({ app, commit }: DevicePinInfo) {
|
||||
if (this.balenaApi == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
@ -720,7 +628,7 @@ export class APIBinder {
|
||||
}
|
||||
|
||||
const targetConfigUnformatted = _.get(
|
||||
await this.getTargetState(),
|
||||
await TargetState.get(),
|
||||
'local.config',
|
||||
);
|
||||
if (targetConfigUnformatted == null) {
|
||||
@ -974,12 +882,6 @@ export class APIBinder {
|
||||
return conf;
|
||||
}
|
||||
|
||||
private lockGetTarget() {
|
||||
return writeLock('getTarget').disposer((release) => {
|
||||
release();
|
||||
});
|
||||
}
|
||||
|
||||
private createAPIBinderRouter(apiBinder: APIBinder): express.Router {
|
||||
const router = express.Router();
|
||||
|
||||
@ -993,9 +895,7 @@ export class APIBinder {
|
||||
.get('instantUpdates')
|
||||
.then((instantUpdates) => {
|
||||
if (instantUpdates) {
|
||||
apiBinder
|
||||
.getAndSetTargetState(req.body.force, true)
|
||||
.catch(_.noop);
|
||||
TargetState.update(req.body.force, true).catch(_.noop);
|
||||
res.sendStatus(204);
|
||||
} else {
|
||||
log.debug(
|
||||
|
@ -23,7 +23,7 @@ import {
|
||||
} from './lib/errors';
|
||||
import { pathExistsOnHost } from './lib/fs-utils';
|
||||
|
||||
import { TargetStateAccessor } from './target-state';
|
||||
import { TargetStateAccessor } from './device-state/target-state-cache';
|
||||
|
||||
import { ServiceManager } from './compose/service-manager';
|
||||
import { Service } from './compose/service';
|
||||
|
@ -1,8 +1,8 @@
|
||||
import * as _ from 'lodash';
|
||||
|
||||
import { ApplicationManager } from './application-manager';
|
||||
import * as config from './config';
|
||||
import * as db from './db';
|
||||
import { ApplicationManager } from '../application-manager';
|
||||
import * as config from '../config';
|
||||
import * as db from '../db';
|
||||
|
||||
// Once we have correct types for both applications and the
|
||||
// incoming target state this should be changed
|
191
src/device-state/target-state.ts
Normal file
191
src/device-state/target-state.ts
Normal file
@ -0,0 +1,191 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import * as url from 'url';
|
||||
import type { Headers } from 'request';
|
||||
import { delay } from 'bluebird';
|
||||
import * as _ from 'lodash';
|
||||
import Bluebird = require('bluebird');
|
||||
import type StrictEventEmitter from 'strict-event-emitter-types';
|
||||
|
||||
import type { TargetState } from '../types/state';
|
||||
import { InternalInconsistencyError } from '../lib/errors';
|
||||
import { getRequestInstance } from '../lib/request';
|
||||
import * as config from '../config';
|
||||
import { writeLock } from '../lib/update-lock';
|
||||
import constants = require('../lib/constants');
|
||||
|
||||
interface TargetStateEvents {
|
||||
'target-state-update': (
|
||||
targetState: TargetState,
|
||||
force: boolean,
|
||||
isFromApi: boolean,
|
||||
) => void;
|
||||
}
|
||||
export const emitter: StrictEventEmitter<
|
||||
EventEmitter,
|
||||
TargetStateEvents
|
||||
> = new EventEmitter();
|
||||
|
||||
const lockGetTarget = () =>
|
||||
writeLock('getTarget').disposer((release) => release());
|
||||
|
||||
let cache: {
|
||||
etag?: string | string[];
|
||||
body: TargetState;
|
||||
};
|
||||
|
||||
/**
|
||||
* appUpdatePollInterval is set when startPoll successfuly queries the config
|
||||
*/
|
||||
let appUpdatePollInterval: number;
|
||||
|
||||
/**
|
||||
* Listen for config changes to appUpdatePollInterval
|
||||
*/
|
||||
(async () => {
|
||||
await config.initialized;
|
||||
config.on('change', (changedConfig) => {
|
||||
if (changedConfig.appUpdatePollInterval) {
|
||||
appUpdatePollInterval = changedConfig.appUpdatePollInterval;
|
||||
}
|
||||
});
|
||||
})();
|
||||
|
||||
/**
|
||||
* The last fetch attempt
|
||||
*
|
||||
* We set a value rather then being undeclared because having it undefined
|
||||
* adds more overhead to dealing with this value without any benefits.
|
||||
*/
|
||||
export let lastFetch: ReturnType<typeof process.hrtime> = process.hrtime();
|
||||
|
||||
/**
|
||||
* Attempts to update the target state
|
||||
* @param force Emitted with the 'target-state-update' event update as necessary
|
||||
* @param isFromApi Emitted with the 'target-state-update' event update as necessary
|
||||
*/
|
||||
export const update = async (
|
||||
// TODO: Is there a better way than passing these params here just to emit them if there is an update?
|
||||
force = false,
|
||||
isFromApi = false,
|
||||
): Promise<void> => {
|
||||
await config.initialized;
|
||||
return Bluebird.using(lockGetTarget(), async () => {
|
||||
const { uuid, apiEndpoint, apiTimeout } = await config.getMany([
|
||||
'uuid',
|
||||
'apiEndpoint',
|
||||
'apiTimeout',
|
||||
]);
|
||||
|
||||
if (typeof apiEndpoint !== 'string') {
|
||||
throw new InternalInconsistencyError(
|
||||
'Non-string apiEndpoint passed to ApiBinder.getTargetState',
|
||||
);
|
||||
}
|
||||
|
||||
const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`);
|
||||
const request = await getRequestInstance();
|
||||
|
||||
const params: Headers = {
|
||||
json: true,
|
||||
};
|
||||
|
||||
if (typeof cache?.etag === 'string') {
|
||||
params.headers = {
|
||||
'If-None-Match': cache.etag,
|
||||
};
|
||||
}
|
||||
|
||||
const [{ statusCode, headers }, body] = await request
|
||||
.getAsync(endpoint, {
|
||||
json: true,
|
||||
})
|
||||
.timeout(apiTimeout);
|
||||
|
||||
if (statusCode === 304) {
|
||||
// There's no change so no need to update the cache or emit a change event
|
||||
return;
|
||||
}
|
||||
|
||||
cache = {
|
||||
etag: headers.etag,
|
||||
body,
|
||||
};
|
||||
|
||||
emitter.emit('target-state-update', _.cloneDeep(body), force, isFromApi);
|
||||
}).finally(() => {
|
||||
lastFetch = process.hrtime();
|
||||
});
|
||||
};
|
||||
|
||||
const poll = async (
|
||||
skipFirstGet: boolean = false,
|
||||
fetchErrors: number = 0,
|
||||
): Promise<void> => {
|
||||
// Add random jitter up to `maxApiJitterDelay` to space out poll requests
|
||||
let pollInterval =
|
||||
Math.random() * constants.maxApiJitterDelay + appUpdatePollInterval;
|
||||
|
||||
// Convenience function used for delaying poll loops
|
||||
const delayedLoop = async (delayBy: number) => {
|
||||
// Wait until we want to poll again
|
||||
await delay(delayBy);
|
||||
// Poll again
|
||||
await poll(false, fetchErrors);
|
||||
};
|
||||
|
||||
// Check if we want to skip first request and just loop again
|
||||
if (skipFirstGet) {
|
||||
return delayedLoop(pollInterval);
|
||||
}
|
||||
|
||||
// Try to fetch latest target state
|
||||
try {
|
||||
await update();
|
||||
} catch (e) {
|
||||
// Exponential back off if request fails
|
||||
pollInterval = Math.min(appUpdatePollInterval, 15000 * 2 ** fetchErrors);
|
||||
++fetchErrors;
|
||||
} finally {
|
||||
// Reset fetchErrors because we successfuly updated
|
||||
fetchErrors = 0;
|
||||
// Wait to poll again
|
||||
await delayedLoop(pollInterval);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the latest target state, attempting a fetch first if we do not already have one
|
||||
*/
|
||||
export const get = async (): Promise<TargetState> => {
|
||||
if (cache == null) {
|
||||
await update();
|
||||
}
|
||||
return _.cloneDeep(cache.body);
|
||||
};
|
||||
|
||||
/**
|
||||
* Start polling for target state updates
|
||||
*
|
||||
* startPoll will try to query the config for all values needed
|
||||
* to being polling. If there is an issue obtaining these values
|
||||
* the function will wait 10 seconds and retry until successful.
|
||||
*/
|
||||
export const startPoll = async (): Promise<void> => {
|
||||
let instantUpdates;
|
||||
try {
|
||||
// Query and set config values we need to avoid multiple db hits
|
||||
const {
|
||||
instantUpdates: updates,
|
||||
appUpdatePollInterval: interval,
|
||||
} = await config.getMany(['instantUpdates', 'appUpdatePollInterval']);
|
||||
instantUpdates = updates;
|
||||
appUpdatePollInterval = interval;
|
||||
} catch {
|
||||
// Delay 10 seconds and retry loading config
|
||||
await delay(10000);
|
||||
// Attempt to start poll again
|
||||
return startPoll();
|
||||
}
|
||||
// All config values fetch, start polling!
|
||||
return poll(!instantUpdates);
|
||||
};
|
@ -12,6 +12,7 @@ import chai = require('./lib/chai-config');
|
||||
import balenaAPI = require('./lib/mocked-balena-api');
|
||||
import { schema } from '../src/config/schema';
|
||||
import ConfigJsonConfigBackend from '../src/config/configJson';
|
||||
import * as TargetState from '../src/device-state/target-state';
|
||||
|
||||
const { expect } = chai;
|
||||
|
||||
@ -310,10 +311,13 @@ describe('ApiBinder', () => {
|
||||
const components: Dictionary<any> = {};
|
||||
let configStub: SinonStub;
|
||||
let infoLobSpy: SinonSpy;
|
||||
let previousLastFetch: ReturnType<typeof process.hrtime>;
|
||||
|
||||
before(async () => {
|
||||
await initModels(components, '/config-apibinder.json');
|
||||
previousLastFetch = TargetState.lastFetch;
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
// @ts-ignore
|
||||
config.configJsonBackend = defaultConfigBackend;
|
||||
@ -330,6 +334,7 @@ describe('ApiBinder', () => {
|
||||
afterEach(() => {
|
||||
configStub.restore();
|
||||
infoLobSpy.restore();
|
||||
(TargetState as any).lastFetch = previousLastFetch;
|
||||
});
|
||||
|
||||
it('passes with correct conditions', async () => {
|
||||
@ -340,6 +345,8 @@ describe('ApiBinder', () => {
|
||||
appUpdatePollInterval: 1000,
|
||||
connectivityCheckEnabled: false,
|
||||
});
|
||||
// Set lastFetch to now so it is within appUpdatePollInterval
|
||||
(TargetState as any).lastFetch = process.hrtime();
|
||||
expect(await components.apiBinder.healthcheck()).to.equal(true);
|
||||
});
|
||||
|
||||
@ -393,6 +400,8 @@ describe('ApiBinder', () => {
|
||||
appUpdatePollInterval: 1000,
|
||||
connectivityCheckEnabled: true,
|
||||
});
|
||||
// Set lastFetch to now so it is within appUpdatePollInterval
|
||||
(TargetState as any).lastFetch = process.hrtime();
|
||||
// Copy previous values to restore later
|
||||
const previousStateReportErrors = components.apiBinder.stateReportErrors;
|
||||
const previousDeviceStateConnected =
|
||||
|
Loading…
Reference in New Issue
Block a user