From 17a5d8dd4918916c954990cfb9666754434ef527 Mon Sep 17 00:00:00 2001 From: Pagan Gazzard Date: Sat, 16 May 2020 14:08:14 +0100 Subject: [PATCH] Isolate target state fetching to its own module which emits on update Change-type: minor --- src/api-binder.ts | 172 ++++------------ src/application-manager.js | 2 +- .../target-state-cache.ts} | 6 +- src/device-state/target-state.ts | 191 ++++++++++++++++++ test/11-api-binder.spec.ts | 9 + 5 files changed, 240 insertions(+), 140 deletions(-) rename src/{target-state.ts => device-state/target-state-cache.ts} (94%) create mode 100644 src/device-state/target-state.ts diff --git a/src/api-binder.ts b/src/api-binder.ts index c057f3b4..0f4d8435 100644 --- a/src/api-binder.ts +++ b/src/api-binder.ts @@ -22,14 +22,14 @@ import { isHttpConflictError, } from './lib/errors'; import * as request from './lib/request'; -import { writeLock } from './lib/update-lock'; -import { DeviceStatus, TargetState } from './types/state'; +import { DeviceStatus } from './types/state'; import log from './lib/supervisor-console'; import DeviceState from './device-state'; import * as globalEventBus from './event-bus'; import Logger from './logger'; +import * as TargetState from './device-state/target-state'; // The exponential backoff starts at 15s const MINIMUM_BACKOFF_DELAY = 15000; @@ -72,8 +72,6 @@ export class APIBinder { private logger: Logger; public balenaApi: PinejsClientRequest | null = null; - // TODO{type}: Retype me when all types are sorted - private cachedBalenaApi: PinejsClientRequest | null = null; private lastReportedState: DeviceStatus = { local: {}, dependent: {}, @@ -82,11 +80,8 @@ export class APIBinder { local: {}, dependent: {}, }; - private lastTarget: TargetState; - private lastTargetStateFetch = process.hrtime(); private reportPending = false; private stateReportErrors = 0; - private targetStateFetchErrors = 0; private readyForUpdates = false; public constructor({ eventTracker, logger }: APIBinderConstructOpts) { @@ -124,7 +119,7 @@ export class APIBinder { } // Check last time target state has been polled - const timeSinceLastFetch = process.hrtime(this.lastTargetStateFetch); + const timeSinceLastFetch = process.hrtime(TargetState.lastFetch); const timeSinceLastFetchMs = timeSinceLastFetch[0] * 1000 + timeSinceLastFetch[1] / 1e6; @@ -177,7 +172,6 @@ export class APIBinder { apiPrefix: baseUrl, passthrough, }); - this.cachedBalenaApi = this.balenaApi.clone({}, { cache: {} }); } public async start() { @@ -232,7 +226,37 @@ export class APIBinder { this.readyForUpdates = true; log.debug('Starting target state poll'); - this.startTargetStatePoll(); + TargetState.startPoll(); + TargetState.emitter.on( + 'target-state-update', + async (targetState, force, isFromApi) => { + try { + await this.deviceState.setTarget(targetState); + this.deviceState.triggerApplyTarget({ force, isFromApi }); + } catch (err) { + if ( + err instanceof ContractValidationError || + err instanceof ContractViolationError + ) { + log.error(`Could not store target state for device: ${err}`); + // the dashboard does not display lines correctly, + // split them explcitly here + const lines = err.message.split(/\r?\n/); + lines[0] = `Could not move to new release: ${lines[0]}`; + for (const line of lines) { + this.logger.logSystemMessage( + line, + {}, + 'targetStateRejection', + false, + ); + } + } else { + log.error(`Failed to get target state for device: ${err}`); + } + } + }, + ); } public async fetchDevice( @@ -333,57 +357,6 @@ export class APIBinder { .timeout(conf.apiTimeout)) as Device; } - public async getTargetState(): Promise { - const { uuid, apiEndpoint, apiTimeout } = await config.getMany([ - 'uuid', - 'apiEndpoint', - 'apiTimeout', - ]); - - if (!_.isString(apiEndpoint)) { - throw new InternalInconsistencyError( - 'Non-string apiEndpoint passed to ApiBinder.getTargetState', - ); - } - if (this.cachedBalenaApi == null) { - throw new InternalInconsistencyError( - 'Attempt to get target state without an API client', - ); - } - - const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`); - const requestParams = _.extend( - { method: 'GET', url: endpoint }, - this.cachedBalenaApi.passthrough, - ); - - return (await this.cachedBalenaApi - ._request(requestParams) - .timeout(apiTimeout)) as TargetState; - } - - // TODO: Once 100% typescript, change this to a native promise - public startTargetStatePoll(): Bluebird { - return Bluebird.try(() => { - if (this.balenaApi == null) { - throw new InternalInconsistencyError( - 'Trying to start poll without initializing API client', - ); - } - config - .get('instantUpdates') - .catch(() => { - // Default to skipping the initial update if we couldn't fetch the setting - // which should be the exceptional case - return false; - }) - .then((instantUpdates) => { - this.pollTargetState(!instantUpdates); - }); - return null; - }); - } - public startCurrentStateReport() { if (this.balenaApi == null) { throw new InternalInconsistencyError( @@ -593,71 +566,6 @@ export class APIBinder { return null; } - private getAndSetTargetState(force: boolean, isFromApi = false) { - return Bluebird.using(this.lockGetTarget(), async () => { - const targetState = await this.getTargetState(); - if (isFromApi || !_.isEqual(targetState, this.lastTarget)) { - await this.deviceState.setTarget(targetState); - this.lastTarget = _.cloneDeep(targetState); - this.deviceState.triggerApplyTarget({ force, isFromApi }); - } - }) - .tapCatch(ContractValidationError, ContractViolationError, (e) => { - log.error(`Could not store target state for device: ${e}`); - // the dashboard does not display lines correctly, - // split them explcitly here - const lines = e.message.split(/\r?\n/); - lines[0] = `Could not move to new release: ${lines[0]}`; - for (const line of lines) { - this.logger.logSystemMessage(line, {}, 'targetStateRejection', false); - } - }) - .tapCatch( - (e: unknown) => - !( - e instanceof ContractValidationError || - e instanceof ContractViolationError - ), - (err) => { - log.error(`Failed to get target state for device: ${err}`); - }, - ) - .finally(() => { - this.lastTargetStateFetch = process.hrtime(); - }); - } - - private async pollTargetState(skipFirstGet: boolean = false): Promise { - let appUpdatePollInterval; - try { - appUpdatePollInterval = await config.get('appUpdatePollInterval'); - if (!skipFirstGet) { - await this.getAndSetTargetState(false); - this.targetStateFetchErrors = 0; - } - - // We add a random jitter up to `maxApiJitterDelay` to - // space out poll requests - const pollInterval = - Math.random() * constants.maxApiJitterDelay + appUpdatePollInterval; - - await Bluebird.delay(pollInterval); - } catch (e) { - const pollInterval = Math.min( - // If we failed fetching the appUpdatePollInterval then there won't have been any network - // requests so we default it to a low value of 10s since we don't need to worry about too - // many network requests - appUpdatePollInterval ?? 10000, - 15000 * 2 ** this.targetStateFetchErrors, - ); - ++this.targetStateFetchErrors; - - await Bluebird.delay(pollInterval); - } finally { - await this.pollTargetState(); - } - } - private async pinDevice({ app, commit }: DevicePinInfo) { if (this.balenaApi == null) { throw new InternalInconsistencyError( @@ -720,7 +628,7 @@ export class APIBinder { } const targetConfigUnformatted = _.get( - await this.getTargetState(), + await TargetState.get(), 'local.config', ); if (targetConfigUnformatted == null) { @@ -974,12 +882,6 @@ export class APIBinder { return conf; } - private lockGetTarget() { - return writeLock('getTarget').disposer((release) => { - release(); - }); - } - private createAPIBinderRouter(apiBinder: APIBinder): express.Router { const router = express.Router(); @@ -993,9 +895,7 @@ export class APIBinder { .get('instantUpdates') .then((instantUpdates) => { if (instantUpdates) { - apiBinder - .getAndSetTargetState(req.body.force, true) - .catch(_.noop); + TargetState.update(req.body.force, true).catch(_.noop); res.sendStatus(204); } else { log.debug( diff --git a/src/application-manager.js b/src/application-manager.js index 38212059..95f22dbe 100644 --- a/src/application-manager.js +++ b/src/application-manager.js @@ -23,7 +23,7 @@ import { } from './lib/errors'; import { pathExistsOnHost } from './lib/fs-utils'; -import { TargetStateAccessor } from './target-state'; +import { TargetStateAccessor } from './device-state/target-state-cache'; import { ServiceManager } from './compose/service-manager'; import { Service } from './compose/service'; diff --git a/src/target-state.ts b/src/device-state/target-state-cache.ts similarity index 94% rename from src/target-state.ts rename to src/device-state/target-state-cache.ts index 6488ee11..6d83d8b1 100644 --- a/src/target-state.ts +++ b/src/device-state/target-state-cache.ts @@ -1,8 +1,8 @@ import * as _ from 'lodash'; -import { ApplicationManager } from './application-manager'; -import * as config from './config'; -import * as db from './db'; +import { ApplicationManager } from '../application-manager'; +import * as config from '../config'; +import * as db from '../db'; // Once we have correct types for both applications and the // incoming target state this should be changed diff --git a/src/device-state/target-state.ts b/src/device-state/target-state.ts new file mode 100644 index 00000000..dd6c755f --- /dev/null +++ b/src/device-state/target-state.ts @@ -0,0 +1,191 @@ +import { EventEmitter } from 'events'; +import * as url from 'url'; +import type { Headers } from 'request'; +import { delay } from 'bluebird'; +import * as _ from 'lodash'; +import Bluebird = require('bluebird'); +import type StrictEventEmitter from 'strict-event-emitter-types'; + +import type { TargetState } from '../types/state'; +import { InternalInconsistencyError } from '../lib/errors'; +import { getRequestInstance } from '../lib/request'; +import * as config from '../config'; +import { writeLock } from '../lib/update-lock'; +import constants = require('../lib/constants'); + +interface TargetStateEvents { + 'target-state-update': ( + targetState: TargetState, + force: boolean, + isFromApi: boolean, + ) => void; +} +export const emitter: StrictEventEmitter< + EventEmitter, + TargetStateEvents +> = new EventEmitter(); + +const lockGetTarget = () => + writeLock('getTarget').disposer((release) => release()); + +let cache: { + etag?: string | string[]; + body: TargetState; +}; + +/** + * appUpdatePollInterval is set when startPoll successfuly queries the config + */ +let appUpdatePollInterval: number; + +/** + * Listen for config changes to appUpdatePollInterval + */ +(async () => { + await config.initialized; + config.on('change', (changedConfig) => { + if (changedConfig.appUpdatePollInterval) { + appUpdatePollInterval = changedConfig.appUpdatePollInterval; + } + }); +})(); + +/** + * The last fetch attempt + * + * We set a value rather then being undeclared because having it undefined + * adds more overhead to dealing with this value without any benefits. + */ +export let lastFetch: ReturnType = process.hrtime(); + +/** + * Attempts to update the target state + * @param force Emitted with the 'target-state-update' event update as necessary + * @param isFromApi Emitted with the 'target-state-update' event update as necessary + */ +export const update = async ( + // TODO: Is there a better way than passing these params here just to emit them if there is an update? + force = false, + isFromApi = false, +): Promise => { + await config.initialized; + return Bluebird.using(lockGetTarget(), async () => { + const { uuid, apiEndpoint, apiTimeout } = await config.getMany([ + 'uuid', + 'apiEndpoint', + 'apiTimeout', + ]); + + if (typeof apiEndpoint !== 'string') { + throw new InternalInconsistencyError( + 'Non-string apiEndpoint passed to ApiBinder.getTargetState', + ); + } + + const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`); + const request = await getRequestInstance(); + + const params: Headers = { + json: true, + }; + + if (typeof cache?.etag === 'string') { + params.headers = { + 'If-None-Match': cache.etag, + }; + } + + const [{ statusCode, headers }, body] = await request + .getAsync(endpoint, { + json: true, + }) + .timeout(apiTimeout); + + if (statusCode === 304) { + // There's no change so no need to update the cache or emit a change event + return; + } + + cache = { + etag: headers.etag, + body, + }; + + emitter.emit('target-state-update', _.cloneDeep(body), force, isFromApi); + }).finally(() => { + lastFetch = process.hrtime(); + }); +}; + +const poll = async ( + skipFirstGet: boolean = false, + fetchErrors: number = 0, +): Promise => { + // Add random jitter up to `maxApiJitterDelay` to space out poll requests + let pollInterval = + Math.random() * constants.maxApiJitterDelay + appUpdatePollInterval; + + // Convenience function used for delaying poll loops + const delayedLoop = async (delayBy: number) => { + // Wait until we want to poll again + await delay(delayBy); + // Poll again + await poll(false, fetchErrors); + }; + + // Check if we want to skip first request and just loop again + if (skipFirstGet) { + return delayedLoop(pollInterval); + } + + // Try to fetch latest target state + try { + await update(); + } catch (e) { + // Exponential back off if request fails + pollInterval = Math.min(appUpdatePollInterval, 15000 * 2 ** fetchErrors); + ++fetchErrors; + } finally { + // Reset fetchErrors because we successfuly updated + fetchErrors = 0; + // Wait to poll again + await delayedLoop(pollInterval); + } +}; + +/** + * Get the latest target state, attempting a fetch first if we do not already have one + */ +export const get = async (): Promise => { + if (cache == null) { + await update(); + } + return _.cloneDeep(cache.body); +}; + +/** + * Start polling for target state updates + * + * startPoll will try to query the config for all values needed + * to being polling. If there is an issue obtaining these values + * the function will wait 10 seconds and retry until successful. + */ +export const startPoll = async (): Promise => { + let instantUpdates; + try { + // Query and set config values we need to avoid multiple db hits + const { + instantUpdates: updates, + appUpdatePollInterval: interval, + } = await config.getMany(['instantUpdates', 'appUpdatePollInterval']); + instantUpdates = updates; + appUpdatePollInterval = interval; + } catch { + // Delay 10 seconds and retry loading config + await delay(10000); + // Attempt to start poll again + return startPoll(); + } + // All config values fetch, start polling! + return poll(!instantUpdates); +}; diff --git a/test/11-api-binder.spec.ts b/test/11-api-binder.spec.ts index 283dbc58..e5fdbb07 100644 --- a/test/11-api-binder.spec.ts +++ b/test/11-api-binder.spec.ts @@ -12,6 +12,7 @@ import chai = require('./lib/chai-config'); import balenaAPI = require('./lib/mocked-balena-api'); import { schema } from '../src/config/schema'; import ConfigJsonConfigBackend from '../src/config/configJson'; +import * as TargetState from '../src/device-state/target-state'; const { expect } = chai; @@ -310,10 +311,13 @@ describe('ApiBinder', () => { const components: Dictionary = {}; let configStub: SinonStub; let infoLobSpy: SinonSpy; + let previousLastFetch: ReturnType; before(async () => { await initModels(components, '/config-apibinder.json'); + previousLastFetch = TargetState.lastFetch; }); + after(async () => { // @ts-ignore config.configJsonBackend = defaultConfigBackend; @@ -330,6 +334,7 @@ describe('ApiBinder', () => { afterEach(() => { configStub.restore(); infoLobSpy.restore(); + (TargetState as any).lastFetch = previousLastFetch; }); it('passes with correct conditions', async () => { @@ -340,6 +345,8 @@ describe('ApiBinder', () => { appUpdatePollInterval: 1000, connectivityCheckEnabled: false, }); + // Set lastFetch to now so it is within appUpdatePollInterval + (TargetState as any).lastFetch = process.hrtime(); expect(await components.apiBinder.healthcheck()).to.equal(true); }); @@ -393,6 +400,8 @@ describe('ApiBinder', () => { appUpdatePollInterval: 1000, connectivityCheckEnabled: true, }); + // Set lastFetch to now so it is within appUpdatePollInterval + (TargetState as any).lastFetch = process.hrtime(); // Copy previous values to restore later const previousStateReportErrors = components.apiBinder.stateReportErrors; const previousDeviceStateConnected =