Merge pull request #1169 from balena-io/device-state-typescript

Convert device-state module to typescript
This commit is contained in:
CameronDiver 2020-02-25 19:55:44 +07:00 committed by GitHub
commit 98b3c29e5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1047 additions and 622 deletions

14
package-lock.json generated
View File

@ -8927,9 +8927,9 @@
}
},
"parse-ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/parse-ms/-/parse-ms-2.0.0.tgz",
"integrity": "sha512-AddiXFSLLCqj+tCRJ9MrUtHZB4DWojO3tk0NVZ+g5MaMQHF2+p2ktqxuoXyPFLljz/aUK0Nfhd/uGWnhXVXEyA==",
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/parse-ms/-/parse-ms-2.1.0.tgz",
"integrity": "sha512-kHt7kzLoS9VBZfUsiKjv43mr91ea+U05EyKkEtqp7vNbHxmaVuEqN7XxeEVnGrMtYOAxGrDElSi96K7EgO1zCA==",
"dev": true
},
"parse-passwd": {
@ -9163,12 +9163,12 @@
"dev": true
},
"pretty-ms": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/pretty-ms/-/pretty-ms-4.0.0.tgz",
"integrity": "sha512-qG66ahoLCwpLXD09ZPHSCbUWYTqdosB7SMP4OffgTgL2PBKXMuUsrk5Bwg8q4qPkjTXsKBMr+YK3Ltd/6F9s/Q==",
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/pretty-ms/-/pretty-ms-5.1.0.tgz",
"integrity": "sha512-4gaK1skD2gwscCfkswYQRmddUb2GJZtzDGRjHWadVHtK/DIKFufa12MvES6/xu1tVbUYeia5bmLcwJtZJQUqnw==",
"dev": true,
"requires": {
"parse-ms": "^2.0.0"
"parse-ms": "^2.1.0"
}
},
"process": {

View File

@ -103,7 +103,7 @@
"network-checker": "^0.1.1",
"pinejs-client-request": "^5.2.0",
"prettier": "1.19.1",
"pretty-ms": "^4.0.0",
"pretty-ms": "^5.1.0",
"request": "^2.51.0",
"resin-lint": "^3.2.4",
"resin-register-device": "^3.0.0",

View File

@ -25,11 +25,11 @@ import {
} from './lib/errors';
import * as request from './lib/request';
import { writeLock } from './lib/update-lock';
import { DeviceApplicationState, TargetState } from './types/state';
import { DeviceStatus, TargetState } from './types/state';
import log from './lib/supervisor-console';
import DeviceState = require('./device-state');
import DeviceState from './device-state';
import Logger from './logger';
// The exponential backoff starts at 15s
@ -80,11 +80,11 @@ export class APIBinder {
public balenaApi: PinejsClientRequest | null = null;
// TODO{type}: Retype me when all types are sorted
private cachedBalenaApi: PinejsClientRequest | null = null;
private lastReportedState: DeviceApplicationState = {
private lastReportedState: DeviceStatus = {
local: {},
dependent: {},
};
private stateForReport: DeviceApplicationState = {
private stateForReport: DeviceStatus = {
local: {},
dependent: {},
};
@ -410,7 +410,7 @@ export class APIBinder {
});
}
private getStateDiff(): DeviceApplicationState {
private getStateDiff(): DeviceStatus {
const lastReportedLocal = this.lastReportedState.local;
const lastReportedDependent = this.lastReportedState.dependent;
if (lastReportedLocal == null || lastReportedDependent == null) {
@ -423,13 +423,13 @@ export class APIBinder {
const diff = {
local: _(this.stateForReport.local)
.omitBy((val, key: keyof DeviceApplicationState['local']) =>
.omitBy((val, key: keyof DeviceStatus['local']) =>
_.isEqual(lastReportedLocal[key], val),
)
.omit(INTERNAL_STATE_KEYS)
.value(),
dependent: _(this.stateForReport.dependent)
.omitBy((val, key: keyof DeviceApplicationState['dependent']) =>
.omitBy((val, key: keyof DeviceStatus['dependent']) =>
_.isEqual(lastReportedDependent[key], val),
)
.omit(INTERNAL_STATE_KEYS)
@ -440,7 +440,7 @@ export class APIBinder {
}
private async sendReportPatch(
stateDiff: DeviceApplicationState,
stateDiff: DeviceStatus,
conf: { apiEndpoint: string; uuid: string; localMode: boolean },
) {
if (this.cachedBalenaApi == null) {
@ -478,9 +478,7 @@ export class APIBinder {
// Returns an object that contains only status fields relevant for the local mode.
// It basically removes information about applications state.
public stripDeviceStateInLocalMode(
state: DeviceApplicationState,
): DeviceApplicationState {
public stripDeviceStateInLocalMode(state: DeviceStatus): DeviceStatus {
return {
local: _.cloneDeep(
_.omit(state.local, 'apps', 'is_on__commit', 'logs_channel'),
@ -710,6 +708,11 @@ export class APIBinder {
);
const deviceId = await this.config.get('deviceId');
if (!currentState.local.config) {
throw new InternalInconsistencyError(
'No config defined in reportInitialEnv',
);
}
const currentConfig: Dictionary<string> = currentState.local.config;
for (const [key, value] of _.toPairs(currentConfig)) {
let varValue = value;

View File

@ -1,14 +1,17 @@
import * as Bluebird from 'bluebird';
import { EventEmitter } from 'events';
import { Router } from 'express';
import Knex = require('knex');
import { ServiceAction } from './device-api/common';
import { EventTracker } from './event-tracker';
import { Logger } from './logger';
import { DeviceApplicationState } from './types/state';
import { DeviceStatus, InstancedAppState } from './types/state';
import ImageManager, { Image } from './compose/images';
import ServiceManager from './compose/service-manager';
import DB from './db';
import DeviceState from './device-state';
import { APIBinder } from './api-binder';
import Config from './config';
@ -16,6 +19,10 @@ import Config from './config';
import NetworkManager from './compose/network-manager';
import VolumeManager from './compose/volume-manager';
import {
CompositionStep,
CompositionStepAction,
} from './compose/composition-steps';
import Network from './compose/network';
import Service from './compose/service';
import Volume from './compose/volume';
@ -34,7 +41,7 @@ declare interface Application {
// This is a non-exhaustive typing for ApplicationManager to avoid
// having to recode the entire class (and all requirements in TS).
export class ApplicationManager extends EventEmitter {
class ApplicationManager extends EventEmitter {
// These probably could be typed, but the types are so messy that we're
// best just waiting for the relevant module to be recoded in typescript.
// At least any types we can be sure of then.
@ -43,7 +50,7 @@ export class ApplicationManager extends EventEmitter {
// typecript, type the following
public _lockingIfNecessary: any;
public logger: Logger;
public deviceState: any;
public deviceState: DeviceState;
public eventTracker: EventTracker;
public apiBinder: APIBinder;
public docker: DockerUtils;
@ -56,6 +63,22 @@ export class ApplicationManager extends EventEmitter {
public images: ImageManager;
public proxyvisor: any;
public timeSpentFetching: number;
public fetchesInProgress: number;
public validActions: string[];
public router: Router;
public constructor({
logger: Logger,
config: Config,
db: DB,
eventTracker: EventTracker,
deviceState: DeviceState,
});
public init(): Promise<void>;
public getCurrentApp(appId: number): Bluebird<Application | null>;
@ -67,21 +90,31 @@ export class ApplicationManager extends EventEmitter {
opts: Options,
): Bluebird<void>;
// FIXME: Type this properly as it's some mutant state between
// the state endpoint and the ApplicationManager internals
public getStatus(): Promise<Dictionay<any>>;
public setTarget(
local: any,
dependent: any,
source: string,
transaction: Knex.Transaction,
): Promise<void>;
public getStatus(): Promise<DeviceStatus>;
// The return type is incompleted
public getTargetApps(): Promise<
Dictionary<{
services: Dictionary<Service>;
volumes: Dictionary<Volume>;
networks: Dictionary<Network>;
}>
>;
public getTargetApps(): Promise<InstancedAppState>;
public stopAll(opts: { force?: boolean; skipLock?: boolean }): Promise<void>;
public serviceNameFromId(serviceId: number): Bluebird<string>;
public imageForService(svc: any): Promise<Image>;
public getDependentTargets(): Promise<any>;
public getCurrentForComparison(): Promise<any>;
public getDependentState(): Promise<any>;
public getExtraStateForComparison(current: any, target: any): Promise<any>;
public getRequiredSteps(
currentState: any,
targetState: any,
extraState: any,
ignoreImages?: boolean,
): Promise<Array<CompositionStep<CompositionStepAction>>>;
public localModeSwitchCompletion(): Promise<void>;
}
export default ApplicationManager;
export = ApplicationManager;

View File

@ -2,7 +2,7 @@ import * as _ from 'lodash';
import Config from '../config';
import ApplicationManager from '../application-manager';
import ApplicationManager = require('../application-manager');
import Images, { Image } from './images';
import Network from './network';
import Service from './service';
@ -95,9 +95,9 @@ interface CompositionStepArgs {
ensureSupervisorNetwork: {};
}
type CompositionStepAction = keyof CompositionStepArgs;
type CompositionStep<T extends CompositionStepAction> = {
step: T;
export type CompositionStepAction = keyof CompositionStepArgs;
export type CompositionStep<T extends CompositionStepAction> = {
action: T;
} & CompositionStepArgs[T];
export function generateStep<T extends CompositionStepAction>(
@ -105,7 +105,7 @@ export function generateStep<T extends CompositionStepAction>(
args: CompositionStepArgs[T],
): CompositionStep<T> {
return {
step: action,
action,
...args,
};
}

View File

@ -410,6 +410,7 @@ export class Service {
user: '',
workingDir: '',
tty: true,
running: true,
});
// Mutate service with extra features

View File

@ -54,7 +54,7 @@ export interface ServiceComposeConfig {
image: string;
init?: string | boolean;
labels?: { [labelName: string]: string };
running: boolean;
running?: boolean;
networkMode?: string;
networks?: string[] | ServiceNetworkDictionary;
pid?: string;

View File

@ -3,7 +3,7 @@ import { NextFunction, Request, Response, Router } from 'express';
import * as _ from 'lodash';
import { fs } from 'mz';
import { ApplicationManager } from '../application-manager';
import ApplicationManager = require('../application-manager');
import { Service } from '../compose/service';
import {
appNotFoundMessage,

View File

@ -12,7 +12,7 @@ import { UnitNotLoadedError } from './lib/errors';
import * as systemd from './lib/systemd';
import { EnvVarObject } from './lib/types';
import { checkInt, checkTruthy } from './lib/validation';
import { DeviceApplicationState } from './types/state';
import { DeviceStatus } from './types/state';
const vpnServiceName = 'openvpn-resin';
@ -29,7 +29,9 @@ interface ConfigOption {
rebootRequired?: boolean;
}
interface ConfigStep {
// FIXME: Bring this and the deviceState and
// applicationState steps together
export interface ConfigStep {
// TODO: This is a bit of a mess, the DeviceConfig class shouldn't
// know that the reboot action exists as it is implemented by
// DeviceState. Fix this weird circular dependency
@ -367,8 +369,8 @@ export class DeviceConfig {
}
public async getRequiredSteps(
currentState: DeviceApplicationState,
targetState: DeviceApplicationState,
currentState: DeviceStatus,
targetState: { local?: { config?: Dictionary<string> } },
): Promise<ConfigStep[]> {
const current: Dictionary<string> = _.get(
currentState,

View File

@ -1,512 +0,0 @@
Promise = require 'bluebird'
_ = require 'lodash'
EventEmitter = require 'events'
express = require 'express'
bodyParser = require 'body-parser'
prettyMs = require 'pretty-ms'
hostConfig = require './host-config'
network = require './network'
constants = require './lib/constants'
validation = require './lib/validation'
systemd = require './lib/systemd'
updateLock = require './lib/update-lock'
{ loadTargetFromFile } = require './device-state/preload'
{ UpdatesLockedError } = require './lib/errors'
{ DeviceConfig } = require './device-config'
ApplicationManager = require './application-manager'
{ log } = require './lib/supervisor-console'
validateLocalState = (state) ->
if state.name?
throw new Error('Invalid device name') if not validation.isValidShortText(state.name)
if !state.apps? or !validation.isValidAppsObject(state.apps)
throw new Error('Invalid apps')
if !state.config? or !validation.isValidEnv(state.config)
throw new Error('Invalid device configuration')
validateDependentState = (state) ->
if state.apps? and !validation.isValidDependentAppsObject(state.apps)
throw new Error('Invalid dependent apps')
if state.devices? and !validation.isValidDependentDevicesObject(state.devices)
throw new Error('Invalid dependent devices')
validateState = Promise.method (state) ->
if !_.isObject(state)
throw new Error('State must be an object')
if !_.isObject(state.local)
throw new Error('Local state must be an object')
validateLocalState(state.local)
if state.dependent?
validateDependentState(state.dependent)
# TODO (refactor): This shouldn't be here, and instead should be part of the other
# device api stuff in ./device-api
createDeviceStateRouter = (deviceState) ->
router = express.Router()
router.use(bodyParser.urlencoded(limit: '10mb', extended: true))
router.use(bodyParser.json(limit: '10mb'))
rebootOrShutdown = (req, res, action) ->
deviceState.config.get('lockOverride')
.then (lockOverride) ->
force = validation.checkTruthy(req.body.force) or lockOverride
deviceState.executeStepAction({ action }, { force })
.then (response) ->
res.status(202).json(response)
.catch (err) ->
if err instanceof UpdatesLockedError
status = 423
else
status = 500
res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' })
router.post '/v1/reboot', (req, res) ->
rebootOrShutdown(req, res, 'reboot')
router.post '/v1/shutdown', (req, res) ->
rebootOrShutdown(req, res, 'shutdown')
router.get '/v1/device/host-config', (req, res) ->
hostConfig.get()
.then (conf) ->
res.json(conf)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
router.patch '/v1/device/host-config', (req, res) ->
hostConfig.patch(req.body, deviceState.config)
.then ->
res.status(200).send('OK')
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
router.get '/v1/device', (req, res) ->
deviceState.getStatus()
.then (state) ->
stateToSend = _.pick(state.local, [
'api_port'
'ip_address'
'os_version'
'supervisor_version'
'update_pending'
'update_failed'
'update_downloaded'
])
if state.local.is_on__commit?
stateToSend.commit = state.local.is_on__commit
# Will produce nonsensical results for multicontainer apps...
service = _.toPairs(_.toPairs(state.local.apps)[0]?[1]?.services)[0]?[1]
if service?
stateToSend.status = service.status
# For backwards compatibility, we adapt Running to the old "Idle"
if stateToSend.status == 'Running'
stateToSend.status = 'Idle'
stateToSend.download_progress = service.download_progress
res.json(stateToSend)
.catch (err) ->
res.status(500).json({ Data: '', Error: err?.message or err or 'Unknown error' })
router.use(deviceState.applications.router)
return router
module.exports = class DeviceState extends EventEmitter
constructor: ({ @db, @config, @eventTracker, @logger }) ->
@deviceConfig = new DeviceConfig({ @db, @config, @logger })
@applications = new ApplicationManager({ @config, @logger, @db, @eventTracker, deviceState: this })
@on 'error', (err) ->
log.error('deviceState error: ', err)
@_currentVolatile = {}
@_writeLock = updateLock.writeLock
@_readLock = updateLock.readLock
@lastSuccessfulUpdate = null
@failedUpdates = 0
@applyInProgress = false
@applyCancelled = false
@cancelDelay = null
@lastApplyStart = process.hrtime()
@scheduledApply = null
@shuttingDown = false
@router = createDeviceStateRouter(this)
@on 'apply-target-state-end', (err) ->
if err?
if not (err instanceof UpdatesLockedError)
log.error('Device state apply error', err)
else
log.success('Device state apply success')
# We also let the device-config module know that we
# successfully reached the target state and that it
# should clear any rate limiting it's applied
@deviceConfig.resetRateLimits()
@applications.on('change', @reportCurrentState)
healthcheck: =>
@config.getMany([ 'unmanaged' ])
.then (conf) =>
cycleTime = process.hrtime(@lastApplyStart)
cycleTimeMs = cycleTime[0] * 1000 + cycleTime[1] / 1e6
cycleTimeWithinInterval = cycleTimeMs - @applications.timeSpentFetching < 2 * @maxPollTime
applyTargetHealthy = conf.unmanaged or !@applyInProgress or @applications.fetchesInProgress > 0 or cycleTimeWithinInterval
return applyTargetHealthy
init: ->
@config.on 'change', (changedConfig) =>
if changedConfig.loggingEnabled?
@logger.enable(changedConfig.loggingEnabled)
if changedConfig.apiSecret?
@reportCurrentState(api_secret: changedConfig.apiSecret)
if changedConfig.appUpdatePollInterval?
@maxPollTime = changedConfig.appUpdatePollInterval
@config.getMany([
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant',
'version', 'provisioned', 'apiEndpoint', 'connectivityCheckEnabled', 'legacyAppsPresent',
'targetStateSet', 'unmanaged', 'appUpdatePollInterval'
])
.then (conf) =>
@maxPollTime = conf.appUpdatePollInterval
@applications.init()
.then =>
if !conf.initialConfigSaved
@saveInitialConfig()
.then =>
@initNetworkChecks(conf)
log.info('Reporting initial state, supervisor version and API info')
@reportCurrentState(
api_port: conf.listenPort
api_secret: conf.apiSecret
os_version: conf.osVersion
os_variant: conf.osVariant
supervisor_version: conf.version
provisioning_progress: null
provisioning_state: ''
status: 'Idle'
logs_channel: null
update_failed: false
update_pending: false
update_downloaded: false
)
.then =>
@applications.getTargetApps()
.then (targetApps) =>
if !conf.provisioned or (_.isEmpty(targetApps) and !conf.targetStateSet)
loadTargetFromFile(null, this)
.finally =>
@config.set({ targetStateSet: 'true' })
else
log.debug('Skipping preloading')
if conf.provisioned and !_.isEmpty(targetApps)
# If we're in this case, it's because we've updated from an older supervisor
# and we need to mark that the target state has been set so that
# the supervisor doesn't try to preload again if in the future target
# apps are empty again (which may happen with multi-app).
@config.set({ targetStateSet: 'true' })
.then =>
@triggerApplyTarget({ initial: true })
initNetworkChecks: ({ apiEndpoint, connectivityCheckEnabled, unmanaged }) =>
network.startConnectivityCheck apiEndpoint, connectivityCheckEnabled, (connected) =>
@connected = connected
@config.on 'change', (changedConfig) ->
if changedConfig.connectivityCheckEnabled?
network.enableConnectivityCheck(changedConfig.connectivityCheckEnabled)
log.debug('Starting periodic check for IP addresses')
network.startIPAddressUpdate() (addresses) =>
@reportCurrentState(
ip_address: addresses.join(' ')
)
, constants.ipAddressUpdateInterval
saveInitialConfig: =>
@deviceConfig.getCurrent()
.then (devConf) =>
@deviceConfig.setTarget(devConf)
.then =>
@config.set({ initialConfigSaved: 'true' })
emitAsync: (ev, args...) =>
setImmediate => @emit(ev, args...)
_readLockTarget: =>
@_readLock('target').disposer (release) ->
release()
_writeLockTarget: =>
@_writeLock('target').disposer (release) ->
release()
_inferStepsLock: =>
@_writeLock('inferSteps').disposer (release) ->
release()
usingReadLockTarget: (fn) =>
Promise.using @_readLockTarget, -> fn()
usingWriteLockTarget: (fn) =>
Promise.using @_writeLockTarget, -> fn()
usingInferStepsLock: (fn) =>
Promise.using @_inferStepsLock, -> fn()
setTarget: (target, localSource = false) ->
# When we get a new target state, clear any built up apply errors
# This means that we can attempt to apply the new state instantly
@failedUpdates = 0
Promise.join(
@config.get('apiEndpoint'),
validateState(target),
(apiEndpoint) =>
@usingWriteLockTarget =>
# Apps, deviceConfig, dependent
@db.transaction (trx) =>
Promise.try =>
@config.set({ name: target.local.name }, trx)
.then =>
@deviceConfig.setTarget(target.local.config, trx)
.then =>
if localSource or not apiEndpoint
@applications.setTarget(target.local.apps, target.dependent, 'local', trx)
else
@applications.setTarget(target.local.apps, target.dependent, apiEndpoint, trx)
)
getTarget: ({ initial = false, intermediate = false } = {}) =>
@usingReadLockTarget =>
if intermediate
return @intermediateTarget
Promise.props({
local: Promise.props({
name: @config.get('name')
config: @deviceConfig.getTarget({ initial })
apps: @applications.getTargetApps()
})
dependent: @applications.getDependentTargets()
})
getStatus: ->
@applications.getStatus()
.then (appsStatus) =>
theState = { local: {}, dependent: {} }
_.merge(theState.local, @_currentVolatile)
theState.local.apps = appsStatus.local
theState.dependent.apps = appsStatus.dependent
if appsStatus.commit and !@applyInProgress
theState.local.is_on__commit = appsStatus.commit
return theState
getCurrentForComparison: ->
Promise.join(
@config.get('name')
@deviceConfig.getCurrent()
@applications.getCurrentForComparison()
@applications.getDependentState()
(name, devConfig, apps, dependent) ->
return {
local: {
name
config: devConfig
apps
}
dependent
}
)
reportCurrentState: (newState = {}) =>
_.assign(@_currentVolatile, newState)
@emitAsync('change')
reboot: (force, skipLock) =>
@applications.stopAll({ force, skipLock })
.then =>
@logger.logSystemMessage('Rebooting', {}, 'Reboot')
systemd.reboot()
.tap =>
@shuttingDown = true
@emitAsync('shutdown')
shutdown: (force, skipLock) =>
@applications.stopAll({ force, skipLock })
.then =>
@logger.logSystemMessage('Shutting down', {}, 'Shutdown')
systemd.shutdown()
.tap =>
@shuttingDown = true
@emitAsync('shutdown')
executeStepAction: (step, { force, initial, skipLock }) =>
Promise.try =>
if @deviceConfig.isValidAction(step.action)
@deviceConfig.executeStepAction(step, { initial })
else if _.includes(@applications.validActions, step.action)
@applications.executeStepAction(step, { force, skipLock })
else
switch step.action
when 'reboot'
# There isn't really a way that these methods can fail,
# and if they do, we wouldn't know about it until after
# the response has been sent back to the API. Just return
# "OK" for this and the below action
@reboot(force, skipLock).return(Data: 'OK', Error: null)
when 'shutdown'
@shutdown(force, skipLock).return(Data: 'OK', Error: null)
when 'noop'
Promise.resolve()
else
throw new Error("Invalid action #{step.action}")
applyStep: (step, { force, initial, intermediate, skipLock }) =>
if @shuttingDown
return
@executeStepAction(step, { force, initial, skipLock })
.tapCatch (err) =>
@emitAsync('step-error', err, step)
.then (stepResult) =>
@emitAsync('step-completed', null, step, stepResult)
applyError: (err, { force, initial, intermediate }) =>
@emitAsync('apply-target-state-error', err)
@emitAsync('apply-target-state-end', err)
if intermediate
throw err
@failedUpdates += 1
@reportCurrentState(update_failed: true)
if @scheduledApply?
if not (err instanceof UpdatesLockedError)
log.error("Updating failed, but there's another update scheduled immediately: ", err)
else
delay = Math.min((2 ** @failedUpdates) * constants.backoffIncrement, @maxPollTime)
# If there was an error then schedule another attempt briefly in the future.
if err instanceof UpdatesLockedError
message = "Updates are locked, retrying in #{prettyMs(delay, compact: true)}..."
@logger.logSystemMessage(message, {}, 'updateLocked', false)
log.info(message)
else
log.error("Scheduling another update attempt in #{delay}ms due to failure: ", err)
@triggerApplyTarget({ force, delay, initial })
applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false, nextDelay = 200, retryCount = 0 } = {}) =>
Promise.try =>
if !intermediate
@applyBlocker
.then =>
@applications.localModeSwitchCompletion()
.then =>
@usingInferStepsLock =>
Promise.all([
@getCurrentForComparison()
@getTarget({ initial, intermediate })
])
.then ([ currentState, targetState ]) =>
@applications.getExtraStateForComparison(currentState, targetState)
.then (extraState) =>
@deviceConfig.getRequiredSteps(currentState, targetState)
.then (deviceConfigSteps) =>
noConfigSteps = _.every(deviceConfigSteps, ({ action }) -> action is 'noop')
if not noConfigSteps
return [false, deviceConfigSteps]
else
@applications.getRequiredSteps(currentState, targetState, extraState, intermediate)
.then (appSteps) ->
# We need to forward to no-ops to the next step if the application state is done
# The true and false values below represent whether we should add an exponential
# backoff if the steps are all no-ops. the reason that this has to be different
# is that a noop step from the application manager means that we should keep running
# in a tight loop, waiting for an image to download (for example). The device-config
# steps generally have a much higher time to wait before the no-ops will stop, so we
# should try to reduce the effect that this will have
if _.isEmpty(appSteps) and noConfigSteps
return [true, deviceConfigSteps]
return [false, appSteps]
.then ([backoff, steps]) =>
if _.isEmpty(steps)
@emitAsync('apply-target-state-end', null)
if !intermediate
log.debug('Finished applying target state')
@applications.timeSpentFetching = 0
@failedUpdates = 0
@lastSuccessfulUpdate = Date.now()
@reportCurrentState(update_failed: false, update_pending: false, update_downloaded: false)
return
if !intermediate
@reportCurrentState(update_pending: true)
if _.every(steps, (step) -> step.action == 'noop')
if backoff
retryCount += 1
# Backoff to a maximum of 10 minutes
nextDelay = Math.min(2 ** retryCount * 1000, 60 * 10 * 1000)
else
nextDelay = 1000
Promise.map steps, (step) =>
@applyStep(step, { force, initial, intermediate, skipLock })
.delay(nextDelay)
.then =>
@applyTarget({ force, initial, intermediate, skipLock, nextDelay, retryCount })
.catch (err) =>
detailedError = new Error('Failed to apply state transition steps. ' + err.message + ' Steps:' + JSON.stringify(_.map(steps, 'action')))
@applyError(detailedError, { force, initial, intermediate })
.catch (err) =>
@applyError(err, { force, initial, intermediate })
pausingApply: (fn) =>
lock = =>
@_writeLock('pause').disposer (release) ->
release()
pause = =>
Promise.try =>
res = null
@applyBlocker = new Promise (resolve) ->
res = resolve
return res
.disposer (resolve) ->
resolve()
Promise.using lock(), ->
Promise.using pause(), ->
fn()
resumeNextApply: =>
@applyUnblocker?()
return
triggerApplyTarget: ({ force = false, delay = 0, initial = false, isFromApi = false } = {}) =>
if @applyInProgress
if !@scheduledApply? || (isFromApi && @cancelDelay)
@scheduledApply = { force, delay }
if isFromApi
# Cancel promise delay if call came from api to
# prevent waiting due to backoff (and if we've
# previously setup a delay)
@cancelDelay?()
else
# If a delay has been set it's because we need to hold off before applying again,
# so we need to respect the maximum delay that has been passed
@scheduledApply.delay = Math.max(delay, @scheduledApply.delay)
@scheduledApply.force or= force
return
@applyCancelled = false
@applyInProgress = true
new Promise (resolve, reject) =>
setTimeout(resolve, delay)
@cancelDelay = reject
.catch =>
@applyCancelled = true
.then =>
@cancelDelay = null
if @applyCancelled
log.info('Skipping applyTarget because of a cancellation')
return
@lastApplyStart = process.hrtime()
log.info('Applying target state')
@applyTarget({ force, initial })
.finally =>
@applyInProgress = false
@reportCurrentState()
if @scheduledApply?
@triggerApplyTarget(@scheduledApply)
@scheduledApply = null
return null
applyIntermediateTarget: (intermediateTarget, { force = false, skipLock = false } = {}) =>
@intermediateTarget = _.cloneDeep(intermediateTarget)
@applyTarget({ intermediate: true, force, skipLock })
.then =>
@intermediateTarget = null

43
src/device-state.d.ts vendored
View File

@ -1,43 +0,0 @@
import { EventEmitter } from 'events';
import { Router } from 'express';
import ApplicationManager from './application-manager';
import Config from './config';
import Database from './db';
import DeviceConfig from './device-config';
import EventTracker from './event-tracker';
import Logger from './logger';
// This is a very incomplete definition of the device state
// class, which should be rewritten in typescript soon
class DeviceState extends EventEmitter {
public applications: ApplicationManager;
public router: Router;
public deviceConfig: DeviceConfig;
public config: Config;
public eventTracker: EventTracker;
// FIXME: I should be removed once device-state is refactored
public connected: boolean;
public constructor(args: {
config: Config;
db: Database;
eventTracker: EventTracker;
logger: Logger;
});
public healthcheck(): Promise<void>;
public normaliseLegacy(client: PinejsClientRequest): Promise<void>;
public loadTargetFromFile(filename: string): Promise<void>;
public getTarget(): Promise<any>;
public setTarget(target: any): Promise<any>;
public triggerApplyTarget(opts: any): Promise<any>;
public reportCurrentState(state: any);
public getCurrentForComparison(): Promise<any>;
public getStatus(): Promise<any>;
public async init();
}
export = DeviceState;

886
src/device-state.ts Normal file
View File

@ -0,0 +1,886 @@
import * as Bluebird from 'bluebird';
import * as bodyParser from 'body-parser';
import { EventEmitter } from 'events';
import * as express from 'express';
import * as _ from 'lodash';
import StrictEventEmitter from 'strict-event-emitter-types';
import prettyMs = require('pretty-ms');
import Config, { ConfigType } from './config';
import Database from './db';
import EventTracker from './event-tracker';
import Logger from './logger';
import {
CompositionStep,
CompositionStepAction,
} from './compose/composition-steps';
import { loadTargetFromFile } from './device-state/preload';
import * as hostConfig from './host-config';
import constants = require('./lib/constants');
import { InternalInconsistencyError, UpdatesLockedError } from './lib/errors';
import * as systemd from './lib/systemd';
import * as updateLock from './lib/update-lock';
import * as validation from './lib/validation';
import * as network from './network';
import ApplicationManager = require('./application-manager');
import DeviceConfig, { ConfigStep } from './device-config';
import { log } from './lib/supervisor-console';
import {
DeviceReportFields,
DeviceStatus,
InstancedDeviceState,
TargetState,
} from './types/state';
function validateLocalState(state: any): asserts state is TargetState['local'] {
if (state.name != null) {
if (!validation.isValidShortText(state.name)) {
throw new Error('Invalid device name');
}
}
if (state.apps == null || !validation.isValidAppsObject(state.apps)) {
throw new Error('Invalid apps');
}
if (state.config == null || !validation.isValidEnv(state.config)) {
throw new Error('Invalid device configuration');
}
}
function validateDependentState(
state: any,
): asserts state is TargetState['dependent'] {
if (
state.apps != null &&
!validation.isValidDependentAppsObject(state.apps)
) {
throw new Error('Invalid dependent apps');
}
if (
state.devices != null &&
!validation.isValidDependentDevicesObject(state.devices)
) {
throw new Error('Invalid dependent devices');
}
}
function validateState(state: any): asserts state is TargetState {
if (!_.isObject(state)) {
throw new Error('State must be an object');
}
// these any typings seem unnecessary but the `isObject`
// call above tells typescript that state is of type
// `object` - which apparently does not allow any fields
// to be accessed
if (!_.isObject((state as any).local)) {
throw new Error('Local state must be an object');
}
validateLocalState((state as any).local);
if ((state as any).dependent != null) {
return validateDependentState((state as any).dependent);
}
}
// TODO (refactor): This shouldn't be here, and instead should be part of the other
// device api stuff in ./device-api
function createDeviceStateRouter(deviceState: DeviceState) {
const router = express.Router();
router.use(bodyParser.urlencoded({ limit: '10mb', extended: true }));
router.use(bodyParser.json({ limit: '10mb' }));
const rebootOrShutdown = async (
req: express.Request,
res: express.Response,
action: DeviceStateStepTarget,
) => {
const override = await deviceState.config.get('lockOverride');
const force = validation.checkTruthy(req.body.force) || override;
try {
const response = await deviceState.executeStepAction(
{ action },
{ force },
);
res.status(202).json(response);
} catch (e) {
const status = e instanceof UpdatesLockedError ? 423 : 500;
res.status(status).json({
Data: '',
Error: (e != null ? e.message : undefined) || e || 'Unknown error',
});
}
};
router.post('/v1/reboot', (req, res) => rebootOrShutdown(req, res, 'reboot'));
router.post('/v1/shutdown', (req, res) =>
rebootOrShutdown(req, res, 'shutdown'),
);
router.get('/v1/device/host-config', (_req, res) =>
hostConfig
.get()
.then(conf => res.json(conf))
.catch(err =>
res.status(503).send(err?.message ?? err ?? 'Unknown error'),
),
);
router.patch('/v1/device/host-config', (req, res) =>
hostConfig
.patch(req.body, deviceState.config)
.then(() => res.status(200).send('OK'))
.catch(err =>
res.status(503).send(err?.message ?? err ?? 'Unknown error'),
),
);
router.get('/v1/device', async (_req, res) => {
try {
const state = await deviceState.getStatus();
const stateToSend = _.pick(state.local, [
'api_port',
'ip_address',
'os_version',
'supervisor_version',
'update_pending',
'update_failed',
'update_downloaded',
]) as Dictionary<unknown>;
if (state.local?.is_on__commit != null) {
stateToSend.commit = state.local.is_on__commit;
}
const service = _.toPairs(
_.toPairs(state.local?.apps)[0]?.[1]?.services,
)[0]?.[1];
if (service != null) {
stateToSend.status = service.status;
if (stateToSend.status === 'Running') {
stateToSend.status = 'Idle';
}
stateToSend.download_progress = service.download_progress;
}
res.json(stateToSend);
} catch (e) {
res.status(500).json({
Data: '',
Error: (e != null ? e.message : undefined) || e || 'Unknown error',
});
}
});
router.use(deviceState.applications.router);
return router;
}
interface DeviceStateConstructOpts {
db: Database;
config: Config;
eventTracker: EventTracker;
logger: Logger;
}
interface DeviceStateEvents {
error: Error;
change: void;
shutdown: void;
'apply-target-state-end': Nullable<Error>;
'apply-target-state-error': Error;
'step-error': (
err: Nullable<Error>,
step: DeviceStateStep<PossibleStepTargets>,
) => void;
'step-completed': (
err: Nullable<Error>,
step: DeviceStateStep<PossibleStepTargets>,
result?: { Data: string; Error: Nullable<Error> },
) => void;
}
type DeviceStateEventEmitter = StrictEventEmitter<
EventEmitter,
DeviceStateEvents
>;
type DeviceStateStepTarget = 'reboot' | 'shutdown' | 'noop';
type PossibleStepTargets = CompositionStepAction | DeviceStateStepTarget;
type DeviceStateStep<T extends PossibleStepTargets> =
| {
action: 'reboot';
}
| { action: 'shutdown' }
| { action: 'noop' }
| CompositionStep<T extends CompositionStepAction ? T : never>
| ConfigStep;
export class DeviceState extends (EventEmitter as new () => DeviceStateEventEmitter) {
public db: Database;
public config: Config;
public eventTracker: EventTracker;
public logger: Logger;
public applications: ApplicationManager;
public deviceConfig: DeviceConfig;
private currentVolatile: DeviceReportFields = {};
private writeLock = updateLock.writeLock;
private readLock = updateLock.readLock;
private cancelDelay: null | (() => void) = null;
private maxPollTime: number;
private intermediateTarget: TargetState | null = null;
private applyBlocker: Nullable<Promise<void>>;
public lastSuccessfulUpdate: number | null = null;
public failedUpdates: number = 0;
public applyInProgress = false;
public applyCancelled = false;
public lastApplyStart = process.hrtime();
public scheduledApply: { force?: boolean; delay?: number } | null = null;
public shuttingDown = false;
public connected: boolean;
public router: express.Router;
constructor({ db, config, eventTracker, logger }: DeviceStateConstructOpts) {
super();
this.db = db;
this.config = config;
this.eventTracker = eventTracker;
this.logger = logger;
this.deviceConfig = new DeviceConfig({
db: this.db,
config: this.config,
logger: this.logger,
});
this.applications = new ApplicationManager({
config: this.config,
logger: this.logger,
db: this.db,
eventTracker: this.eventTracker,
deviceState: this,
});
this.on('error', err => log.error('deviceState error: ', err));
this.on('apply-target-state-end', function(err) {
if (err != null) {
if (!(err instanceof UpdatesLockedError)) {
return log.error('Device state apply error', err);
}
} else {
log.success('Device state apply success');
// We also let the device-config module know that we
// successfully reached the target state and that it
// should clear any rate limiting it's applied
return this.deviceConfig.resetRateLimits();
}
});
this.applications.on('change', d => this.reportCurrentState(d));
this.router = createDeviceStateRouter(this);
}
public async healthcheck() {
const unmanaged = await this.config.get('unmanaged');
const cycleTime = process.hrtime(this.lastApplyStart);
const cycleTimeMs = cycleTime[0] * 1000 + cycleTime[1] / 1e6;
const cycleTimeWithinInterval =
cycleTimeMs - this.applications.timeSpentFetching < 2 * this.maxPollTime;
const applyTargetHealthy =
unmanaged ||
!this.applyInProgress ||
this.applications.fetchesInProgress > 0 ||
cycleTimeWithinInterval;
return applyTargetHealthy;
}
public async init() {
this.config.on('change', changedConfig => {
if (changedConfig.loggingEnabled != null) {
this.logger.enable(changedConfig.loggingEnabled);
}
if (changedConfig.apiSecret != null) {
this.reportCurrentState({ api_secret: changedConfig.apiSecret });
}
if (changedConfig.appUpdatePollInterval != null) {
this.maxPollTime = changedConfig.appUpdatePollInterval;
}
});
const conf = await this.config.getMany([
'initialConfigSaved',
'listenPort',
'apiSecret',
'osVersion',
'osVariant',
'version',
'provisioned',
'apiEndpoint',
'connectivityCheckEnabled',
'legacyAppsPresent',
'targetStateSet',
'unmanaged',
'appUpdatePollInterval',
]);
this.maxPollTime = conf.appUpdatePollInterval;
await this.applications.init();
if (!conf.initialConfigSaved) {
return this.saveInitialConfig();
}
this.initNetworkChecks(conf);
log.info('Reporting initial state, supervisor version and API info');
await this.reportCurrentState({
api_port: conf.listenPort,
api_secret: conf.apiSecret,
os_version: conf.osVersion,
os_variant: conf.osVariant,
supervisor_version: conf.version,
provisioning_progress: null,
provisioning_state: '',
status: 'Idle',
logs_channel: null,
update_failed: false,
update_pending: false,
update_downloaded: false,
});
const targetApps = await this.applications.getTargetApps();
if (!conf.provisioned || (_.isEmpty(targetApps) && !conf.targetStateSet)) {
try {
await loadTargetFromFile(null, this);
} finally {
await this.config.set({ targetStateSet: true });
}
} else {
log.debug('Skipping preloading');
if (conf.provisioned && !_.isEmpty(targetApps)) {
// If we're in this case, it's because we've updated from an older supervisor
// and we need to mark that the target state has been set so that
// the supervisor doesn't try to preload again if in the future target
// apps are empty again (which may happen with multi-app).
await this.config.set({ targetStateSet: true });
}
}
await this.triggerApplyTarget({ initial: true });
}
public async initNetworkChecks({
apiEndpoint,
connectivityCheckEnabled,
}: {
apiEndpoint: ConfigType<'apiEndpoint'>;
connectivityCheckEnabled: ConfigType<'connectivityCheckEnabled'>;
}) {
network.startConnectivityCheck(
apiEndpoint,
connectivityCheckEnabled,
connected => {
return (this.connected = connected);
},
);
this.config.on('change', function(changedConfig) {
if (changedConfig.connectivityCheckEnabled != null) {
return network.enableConnectivityCheck(
changedConfig.connectivityCheckEnabled,
);
}
});
log.debug('Starting periodic check for IP addresses');
await network.startIPAddressUpdate()(async addresses => {
await this.reportCurrentState({
ip_address: addresses.join(' '),
});
}, constants.ipAddressUpdateInterval);
}
private async saveInitialConfig() {
const devConf = await this.deviceConfig.getCurrent();
await this.deviceConfig.setTarget(devConf);
await this.config.set({ initialConfigSaved: true });
}
// We keep compatibility with the StrictEventEmitter types
// from the outside, but within this function all hells
// breaks loose due to the liberal any casting
private emitAsync<T extends keyof DeviceStateEvents>(
ev: T,
...args: DeviceStateEvents[T] extends (...args: any) => void
? Parameters<DeviceStateEvents[T]>
: Array<DeviceStateEvents[T]>
) {
if (_.isArray(args)) {
return setImmediate(() => this.emit(ev as any, ...args));
} else {
return setImmediate(() => this.emit(ev as any, args));
}
}
private readLockTarget = () =>
this.readLock('target').disposer(release => release());
private writeLockTarget = () =>
this.writeLock('target').disposer(release => release());
private inferStepsLock = () =>
this.writeLock('inferSteps').disposer(release => release());
private usingReadLockTarget(fn: () => any) {
return Bluebird.using(this.readLockTarget, () => fn());
}
private usingWriteLockTarget(fn: () => any) {
return Bluebird.using(this.writeLockTarget, () => fn());
}
private usingInferStepsLock(fn: () => any) {
return Bluebird.using(this.inferStepsLock, () => fn());
}
public async setTarget(target: TargetState, localSource?: boolean) {
// When we get a new target state, clear any built up apply errors
// This means that we can attempt to apply the new state instantly
if (localSource == null) {
localSource = false;
}
this.failedUpdates = 0;
validateState(target);
const apiEndpoint = await this.config.get('apiEndpoint');
await this.usingWriteLockTarget(async () => {
await this.db.transaction(async trx => {
await this.config.set({ name: target.local.name }, trx);
await this.deviceConfig.setTarget(target.local.config, trx);
if (localSource || apiEndpoint == null) {
await this.applications.setTarget(
target.local.apps,
target.dependent,
'local',
trx,
);
} else {
await this.applications.setTarget(
target.local.apps,
target.dependent,
apiEndpoint,
trx,
);
}
});
});
}
public getTarget({
initial = false,
intermediate = false,
}: { initial?: boolean; intermediate?: boolean } = {}): Bluebird<
InstancedDeviceState
> {
return this.usingReadLockTarget(async () => {
if (intermediate) {
return this.intermediateTarget;
}
return {
local: {
name: await this.config.get('name'),
config: await this.deviceConfig.getTarget({ initial }),
apps: await this.applications.getTargetApps(),
},
dependent: await this.applications.getDependentTargets(),
};
}) as Bluebird<InstancedDeviceState>;
}
public async getStatus(): Promise<DeviceStatus> {
const appsStatus = await this.applications.getStatus();
const theState: DeepPartial<DeviceStatus> = {
local: {},
dependent: {},
};
theState.local = { ...theState.local, ...this.currentVolatile };
theState.local.apps = appsStatus.local;
theState.dependent!.apps = appsStatus.dependent;
if (appsStatus.commit && !this.applyInProgress) {
theState.local.is_on__commit = appsStatus.commit;
}
return theState as DeviceStatus;
}
public async getCurrentForComparison(): Promise<
DeviceStatus & { local: { name: string } }
> {
const [name, devConfig, apps, dependent] = await Promise.all([
this.config.get('name'),
this.deviceConfig.getCurrent(),
this.applications.getCurrentForComparison(),
this.applications.getDependentState(),
]);
return {
local: {
name,
config: devConfig,
apps,
},
dependent,
};
}
public reportCurrentState(newState: DeviceReportFields = {}) {
if (newState == null) {
newState = {};
}
this.currentVolatile = { ...this.currentVolatile, ...newState };
return this.emitAsync('change', undefined);
}
private async reboot(force?: boolean, skipLock?: boolean) {
await this.applications.stopAll({ force, skipLock });
this.logger.logSystemMessage('Rebooting', {}, 'Reboot');
const reboot = await systemd.reboot();
this.shuttingDown = true;
this.emitAsync('shutdown', undefined);
return reboot;
}
private async shutdown(force?: boolean, skipLock?: boolean) {
await this.applications.stopAll({ force, skipLock });
this.logger.logSystemMessage('Shutting down', {}, 'Shutdown');
const shutdown = await systemd.shutdown();
this.shuttingDown = true;
this.emitAsync('shutdown', undefined);
return shutdown;
}
public async executeStepAction<T extends PossibleStepTargets>(
step: DeviceStateStep<T>,
{
force,
initial,
skipLock,
}: { force?: boolean; initial?: boolean; skipLock?: boolean },
) {
if (this.deviceConfig.isValidAction(step.action)) {
await this.deviceConfig.executeStepAction(step as ConfigStep, {
initial,
});
} else if (_.includes(this.applications.validActions, step.action)) {
return this.applications.executeStepAction(step as any, {
force,
skipLock,
});
} else {
switch (step.action) {
case 'reboot':
// There isn't really a way that these methods can fail,
// and if they do, we wouldn't know about it until after
// the response has been sent back to the API. Just return
// "OK" for this and the below action
await this.reboot(force, skipLock);
return {
Data: 'OK',
Error: null,
};
case 'shutdown':
await this.shutdown(force, skipLock);
return {
Data: 'OK',
Error: null,
};
case 'noop':
return;
default:
throw new Error(`Invalid action ${step.action}`);
}
}
}
public async applyStep<T extends PossibleStepTargets>(
step: DeviceStateStep<T>,
{
force,
initial,
skipLock,
}: {
force?: boolean;
initial?: boolean;
skipLock?: boolean;
},
) {
if (this.shuttingDown) {
return;
}
try {
const stepResult = await this.executeStepAction(step, {
force,
initial,
skipLock,
});
this.emitAsync('step-completed', null, step, stepResult || undefined);
} catch (e) {
this.emitAsync('step-error', e, step);
throw e;
}
}
private applyError(
err: Error,
{
force,
initial,
intermediate,
}: { force?: boolean; initial?: boolean; intermediate?: boolean },
) {
this.emitAsync('apply-target-state-error', err);
this.emitAsync('apply-target-state-end', err);
if (intermediate) {
throw err;
}
this.failedUpdates += 1;
this.reportCurrentState({ update_failed: true });
if (this.scheduledApply != null) {
if (!(err instanceof UpdatesLockedError)) {
log.error(
"Updating failed, but there's another update scheduled immediately: ",
err,
);
}
} else {
const delay = Math.min(
Math.pow(2, this.failedUpdates) * constants.backoffIncrement,
this.maxPollTime,
);
// If there was an error then schedule another attempt briefly in the future.
if (err instanceof UpdatesLockedError) {
const message = `Updates are locked, retrying in ${prettyMs(delay, {
compact: true,
})}...`;
this.logger.logSystemMessage(message, {}, 'updateLocked', false);
log.info(message);
} else {
log.error(
`Scheduling another update attempt in ${delay}ms due to failure: `,
err,
);
}
return this.triggerApplyTarget({ force, delay, initial });
}
}
public async applyTarget({
force = false,
initial = false,
intermediate = false,
skipLock = false,
nextDelay = 200,
retryCount = 0,
} = {}) {
if (!intermediate) {
await this.applyBlocker;
}
await this.applications.localModeSwitchCompletion();
return this.usingInferStepsLock(async () => {
const [currentState, targetState] = await Promise.all([
this.getCurrentForComparison(),
this.getTarget({ initial, intermediate }),
]);
const extraState = await this.applications.getExtraStateForComparison(
currentState,
targetState,
);
const deviceConfigSteps = await this.deviceConfig.getRequiredSteps(
currentState,
targetState,
);
const noConfigSteps = _.every(
deviceConfigSteps,
({ action }) => action === 'noop',
);
let backoff: boolean;
let steps: Array<DeviceStateStep<PossibleStepTargets>>;
if (!noConfigSteps) {
backoff = false;
steps = deviceConfigSteps;
} else {
const appSteps = await this.applications.getRequiredSteps(
currentState,
targetState,
extraState,
intermediate,
);
if (_.isEmpty(appSteps)) {
// If we retrieve a bunch of no-ops from the
// device config, generally we want to back off
// more than if we retrieve them from the
// application manager
backoff = true;
steps = deviceConfigSteps;
} else {
backoff = false;
steps = appSteps;
}
}
if (_.isEmpty(steps)) {
this.emitAsync('apply-target-state-end', null);
if (!intermediate) {
log.debug('Finished applying target state');
this.applications.timeSpentFetching = 0;
this.failedUpdates = 0;
this.lastSuccessfulUpdate = Date.now();
this.reportCurrentState({
update_failed: false,
update_pending: false,
update_downloaded: false,
});
}
return;
}
if (!intermediate) {
this.reportCurrentState({ update_pending: true });
}
if (_.every(steps, step => step.action === 'noop')) {
if (backoff) {
retryCount += 1;
// Backoff to a maximum of 10 minutes
nextDelay = Math.min(Math.pow(2, retryCount) * 1000, 60 * 10 * 1000);
} else {
nextDelay = 1000;
}
}
try {
await Promise.all(
steps.map(s => this.applyStep(s, { force, initial, skipLock })),
);
await Bluebird.delay(nextDelay);
await this.applyTarget({
force,
initial,
intermediate,
skipLock,
nextDelay,
retryCount,
});
} catch (e) {
const detailedError = new Error(
'Failed to apply state transition steps. ' +
e.message +
' Steps:' +
JSON.stringify(_.map(steps, 'action')),
);
return this.applyError(detailedError, {
force,
initial,
intermediate,
});
}
}).catch(err => {
return this.applyError(err, { force, initial, intermediate });
});
}
public pausingApply(fn: () => any) {
const lock = () => {
return this.writeLock('pause').disposer(release => release());
};
// TODO: This function is a bit of a mess
const pause = () => {
return Bluebird.try(() => {
let res = null;
this.applyBlocker = new Promise(resolve => {
res = resolve;
});
return res;
}).disposer((resolve: any) => resolve());
};
return Bluebird.using(lock(), () => Bluebird.using(pause(), () => fn()));
}
public triggerApplyTarget({
force = false,
delay = 0,
initial = false,
isFromApi = false,
} = {}) {
if (this.applyInProgress) {
if (this.scheduledApply == null || (isFromApi && this.cancelDelay)) {
this.scheduledApply = { force, delay };
if (isFromApi) {
// Cancel promise delay if call came from api to
// prevent waiting due to backoff (and if we've
// previously setup a delay)
this.cancelDelay?.();
}
} else {
// If a delay has been set it's because we need to hold off before applying again,
// so we need to respect the maximum delay that has
// been passed
if (!this.scheduledApply.delay) {
throw new InternalInconsistencyError(
'No delay specified in scheduledApply',
);
}
this.scheduledApply.delay = Math.max(delay, this.scheduledApply.delay);
if (!this.scheduledApply.force) {
this.scheduledApply.force = force;
}
}
return;
}
this.applyCancelled = false;
this.applyInProgress = true;
new Bluebird((resolve, reject) => {
setTimeout(resolve, delay);
this.cancelDelay = reject;
})
.catch(() => {
this.applyCancelled = true;
})
.then(() => {
this.cancelDelay = null;
if (this.applyCancelled) {
log.info('Skipping applyTarget because of a cancellation');
return;
}
this.lastApplyStart = process.hrtime();
log.info('Applying target state');
return this.applyTarget({ force, initial });
})
.finally(() => {
this.applyInProgress = false;
this.reportCurrentState();
if (this.scheduledApply != null) {
this.triggerApplyTarget(this.scheduledApply);
this.scheduledApply = null;
}
});
return null;
}
public applyIntermediateTarget(
intermediateTarget: TargetState,
{ force = false, skipLock = false } = {},
) {
this.intermediateTarget = _.cloneDeep(intermediateTarget);
return this.applyTarget({ intermediate: true, force, skipLock }).then(
() => {
this.intermediateTarget = null;
},
);
}
}
export default DeviceState;

View File

@ -2,7 +2,7 @@ import * as _ from 'lodash';
import { fs } from 'mz';
import { Image } from '../compose/images';
import DeviceState = require('../device-state');
import DeviceState from '../device-state';
import constants = require('../lib/constants');
import { AppsJsonParseError, EISDIR, ENOENT } from '../lib/errors';
@ -34,7 +34,7 @@ export async function loadTargetFromFile(
if (_.isArray(stateFromFile)) {
log.debug('Detected a legacy apps.json, converting...');
stateFromFile = convertLegacyAppsJson(stateFromFile);
stateFromFile = convertLegacyAppsJson(stateFromFile as any[]);
}
const preloadState = stateFromFile as AppsJsonFormat;
@ -80,12 +80,15 @@ export async function loadTargetFromFile(
preloadState.config,
);
preloadState.config = { ...formattedConf, ...deviceConf };
const localState = { local: { name: '', ...preloadState } };
const localState = {
local: { name: '', ...preloadState },
dependent: { apps: [], devices: [] },
};
await deviceState.setTarget(localState);
log.success('Preloading complete');
if (stateFromFile.pinDevice) {
if (preloadState.pinDevice) {
// Multi-app warning!
// The following will need to be changed once running
// multiple applications is possible

View File

@ -9,10 +9,10 @@ import * as rimraf from 'rimraf';
const mkdirpAsync = Bluebird.promisify(mkdirp);
const rimrafAsync = Bluebird.promisify(rimraf);
import ApplicationManager from '../application-manager';
import ApplicationManager = require('../application-manager');
import Config from '../config';
import Database, { Transaction } from '../db';
import DeviceState = require('../device-state');
import DeviceState from '../device-state';
import * as constants from '../lib/constants';
import { BackupError, DatabaseParseError, NotFoundError } from '../lib/errors';
import { pathExistsOnHost } from '../lib/fs-utils';

View File

@ -1,13 +1,12 @@
import APIBinder from './api-binder';
import Config, { ConfigKey } from './config';
import Database from './db';
import DeviceState from './device-state';
import EventTracker from './event-tracker';
import { normaliseLegacyDatabase } from './lib/migration';
import Logger from './logger';
import SupervisorAPI from './supervisor-api';
import DeviceState = require('./device-state');
import constants = require('./lib/constants');
import log from './lib/supervisor-console';
import version = require('./lib/supervisor-version');

View File

@ -1,6 +1,6 @@
import * as _ from 'lodash';
import ApplicationManager from './application-manager';
import ApplicationManager = require('./application-manager');
import Config from './config';
import Database, { Transaction } from './db';

View File

@ -1,9 +1,29 @@
import { ComposeNetworkConfig } from '../compose/types/network';
import { ServiceComposeConfig } from '../compose/types/service';
import { ComposeVolumeConfig } from '../compose/volume';
import Volume, { ComposeVolumeConfig } from '../compose/volume';
import { EnvVarObject, LabelObject } from '../lib/types';
export interface DeviceApplicationState {
import Network from '../compose/network';
import Service from '../compose/service';
export type DeviceReportFields = Partial<{
api_port: number;
api_secret: string | null;
ip_address: string;
os_version: string | null;
os_variant: string | null;
supervisor_version: string;
provisioning_progress: null | number;
provisioning_state: string;
status: string;
update_failed: boolean;
update_pending: boolean;
update_downloaded: boolean;
is_on__commit: string;
logs_channel: null;
}>;
export interface DeviceStatus {
local?: {
config?: Dictionary<string>;
apps?: {
@ -17,7 +37,7 @@ export interface DeviceApplicationState {
};
};
};
};
} & DeviceReportFields;
// TODO: Type the dependent entry correctly
dependent?: any;
commit?: string;
@ -40,8 +60,8 @@ export interface TargetState {
imageId: number;
serviceName: string;
image: string;
running: boolean;
environment: EnvVarObject;
running?: boolean;
environment: Dictionary<string>;
} & ServiceComposeConfig;
};
volumes: Dictionary<Partial<ComposeVolumeConfig>>;
@ -52,14 +72,14 @@ export interface TargetState {
// TODO: Correctly type this once dependent devices are
// actually properly supported
dependent: {
apps: Dictionary<{
apps: Array<{
name?: string;
image?: string;
commit?: string;
config?: EnvVarObject;
environment?: EnvVarObject;
}>;
devices: Dictionary<{
devices: Array<{
name?: string;
apps?: Dictionary<{
config?: EnvVarObject;
@ -86,3 +106,28 @@ export type ApplicationDatabaseFormat = Array<{
networks: string;
volumes: string;
}>;
// This structure is the internal representation of both
// target and current state. We create instances of compose
// objects and these are what the state engine uses to
// detect what it should do to move between them
export interface InstancedAppState {
[appId: number]: {
appId: number;
commit: string;
releaseId: number;
name: string;
services: Service[];
volumes: Dictionary<Volume>;
networks: Dictionary<Network>;
};
}
export interface InstancedDeviceState {
local: {
name: string;
config: Dictionary<string>;
apps: InstancedAppState;
};
dependent: any;
}

View File

@ -12,7 +12,7 @@ const { expect } = chai;
import Config from '../src/config';
import { RPiConfigBackend } from '../src/config/backend';
import DB from '../src/db';
import DeviceState = require('../src/device-state');
import DeviceState from '../src/device-state';
import { loadTargetFromFile } from '../src/device-state/preload';
@ -112,6 +112,8 @@ const testTarget2 = {
labels: {},
},
},
volumes: {},
networks: {},
},
},
},
@ -310,7 +312,7 @@ describe('deviceState', () => {
});
it('emits a change event when a new state is reported', () => {
deviceState.reportCurrentState({ someStateDiff: 'someValue' });
deviceState.reportCurrentState({ someStateDiff: 'someValue' } as any);
return (expect as any)(deviceState).to.emit('change');
});
@ -339,7 +341,7 @@ describe('deviceState', () => {
});
it('does not allow setting an invalid target state', () => {
expect(deviceState.setTarget(testTargetInvalid)).to.be.rejected;
expect(deviceState.setTarget(testTargetInvalid as any)).to.be.rejected;
});
it('allows triggering applying the target state', done => {

View File

@ -11,7 +11,7 @@ const { expect } = chai;
import ApiBinder from '../src/api-binder';
import Config from '../src/config';
import DB from '../src/db';
import DeviceState = require('../src/device-state');
import DeviceState from '../src/device-state';
const initModels = async (obj: Dictionary<any>, filename: string) => {
prepare();

View File

@ -7,7 +7,7 @@ chai.use(require('chai-events'))
{ expect } = chai
prepare = require './lib/prepare'
DeviceState = require '../src/device-state'
{ DeviceState } = require '../src/device-state'
{ DB } = require('../src/db')
{ Config } = require('../src/config')
{ Service } = require '../src/compose/service'

View File

@ -1,6 +1,6 @@
import { expect } from 'chai';
import { APIBinder, APIBinderConstructOpts } from '../src/api-binder';
import { DeviceApplicationState } from '../src/types/state';
import { DeviceStatus } from '../src/types/state';
describe('APIBinder', () => {
let apiBinder: APIBinder;
@ -27,7 +27,7 @@ describe('APIBinder', () => {
is_on__commit: 'whatever',
},
dependent: { apps: {} },
} as DeviceApplicationState;
} as DeviceStatus;
it('should strip applications data', () => {
const result = apiBinder.stripDeviceStateInLocalMode(

6
typings/global.d.ts vendored
View File

@ -6,3 +6,9 @@ type Callback<T> = (err?: Error, res?: T) => void;
type Nullable<T> = T | null | undefined;
type Resolvable<T> = T | Promise<T>;
type UnwrappedPromise = T extends PromiseLike<infer U> ? U : T;
type DeepPartial<T> = T extends object
? { [K in keyof T]?: DeepPartial<T[K]> }
: T;