mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-05-31 06:41:05 +00:00
Convert device-state module to typescript
Change-type: patch Signed-off-by: Cameron Diver <cameron@balena.io>
This commit is contained in:
parent
2309442555
commit
c21da8f3db
@ -25,11 +25,11 @@ import {
|
|||||||
} from './lib/errors';
|
} from './lib/errors';
|
||||||
import * as request from './lib/request';
|
import * as request from './lib/request';
|
||||||
import { writeLock } from './lib/update-lock';
|
import { writeLock } from './lib/update-lock';
|
||||||
import { DeviceApplicationState, TargetState } from './types/state';
|
import { DeviceStatus, TargetState } from './types/state';
|
||||||
|
|
||||||
import log from './lib/supervisor-console';
|
import log from './lib/supervisor-console';
|
||||||
|
|
||||||
import DeviceState = require('./device-state');
|
import DeviceState from './device-state';
|
||||||
import Logger from './logger';
|
import Logger from './logger';
|
||||||
|
|
||||||
// The exponential backoff starts at 15s
|
// The exponential backoff starts at 15s
|
||||||
@ -80,11 +80,11 @@ export class APIBinder {
|
|||||||
public balenaApi: PinejsClientRequest | null = null;
|
public balenaApi: PinejsClientRequest | null = null;
|
||||||
// TODO{type}: Retype me when all types are sorted
|
// TODO{type}: Retype me when all types are sorted
|
||||||
private cachedBalenaApi: PinejsClientRequest | null = null;
|
private cachedBalenaApi: PinejsClientRequest | null = null;
|
||||||
private lastReportedState: DeviceApplicationState = {
|
private lastReportedState: DeviceStatus = {
|
||||||
local: {},
|
local: {},
|
||||||
dependent: {},
|
dependent: {},
|
||||||
};
|
};
|
||||||
private stateForReport: DeviceApplicationState = {
|
private stateForReport: DeviceStatus = {
|
||||||
local: {},
|
local: {},
|
||||||
dependent: {},
|
dependent: {},
|
||||||
};
|
};
|
||||||
@ -410,7 +410,7 @@ export class APIBinder {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private getStateDiff(): DeviceApplicationState {
|
private getStateDiff(): DeviceStatus {
|
||||||
const lastReportedLocal = this.lastReportedState.local;
|
const lastReportedLocal = this.lastReportedState.local;
|
||||||
const lastReportedDependent = this.lastReportedState.dependent;
|
const lastReportedDependent = this.lastReportedState.dependent;
|
||||||
if (lastReportedLocal == null || lastReportedDependent == null) {
|
if (lastReportedLocal == null || lastReportedDependent == null) {
|
||||||
@ -423,13 +423,13 @@ export class APIBinder {
|
|||||||
|
|
||||||
const diff = {
|
const diff = {
|
||||||
local: _(this.stateForReport.local)
|
local: _(this.stateForReport.local)
|
||||||
.omitBy((val, key: keyof DeviceApplicationState['local']) =>
|
.omitBy((val, key: keyof DeviceStatus['local']) =>
|
||||||
_.isEqual(lastReportedLocal[key], val),
|
_.isEqual(lastReportedLocal[key], val),
|
||||||
)
|
)
|
||||||
.omit(INTERNAL_STATE_KEYS)
|
.omit(INTERNAL_STATE_KEYS)
|
||||||
.value(),
|
.value(),
|
||||||
dependent: _(this.stateForReport.dependent)
|
dependent: _(this.stateForReport.dependent)
|
||||||
.omitBy((val, key: keyof DeviceApplicationState['dependent']) =>
|
.omitBy((val, key: keyof DeviceStatus['dependent']) =>
|
||||||
_.isEqual(lastReportedDependent[key], val),
|
_.isEqual(lastReportedDependent[key], val),
|
||||||
)
|
)
|
||||||
.omit(INTERNAL_STATE_KEYS)
|
.omit(INTERNAL_STATE_KEYS)
|
||||||
@ -440,7 +440,7 @@ export class APIBinder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async sendReportPatch(
|
private async sendReportPatch(
|
||||||
stateDiff: DeviceApplicationState,
|
stateDiff: DeviceStatus,
|
||||||
conf: { apiEndpoint: string; uuid: string; localMode: boolean },
|
conf: { apiEndpoint: string; uuid: string; localMode: boolean },
|
||||||
) {
|
) {
|
||||||
if (this.cachedBalenaApi == null) {
|
if (this.cachedBalenaApi == null) {
|
||||||
@ -478,9 +478,7 @@ export class APIBinder {
|
|||||||
|
|
||||||
// Returns an object that contains only status fields relevant for the local mode.
|
// Returns an object that contains only status fields relevant for the local mode.
|
||||||
// It basically removes information about applications state.
|
// It basically removes information about applications state.
|
||||||
public stripDeviceStateInLocalMode(
|
public stripDeviceStateInLocalMode(state: DeviceStatus): DeviceStatus {
|
||||||
state: DeviceApplicationState,
|
|
||||||
): DeviceApplicationState {
|
|
||||||
return {
|
return {
|
||||||
local: _.cloneDeep(
|
local: _.cloneDeep(
|
||||||
_.omit(state.local, 'apps', 'is_on__commit', 'logs_channel'),
|
_.omit(state.local, 'apps', 'is_on__commit', 'logs_channel'),
|
||||||
@ -710,6 +708,11 @@ export class APIBinder {
|
|||||||
);
|
);
|
||||||
const deviceId = await this.config.get('deviceId');
|
const deviceId = await this.config.get('deviceId');
|
||||||
|
|
||||||
|
if (!currentState.local.config) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
'No config defined in reportInitialEnv',
|
||||||
|
);
|
||||||
|
}
|
||||||
const currentConfig: Dictionary<string> = currentState.local.config;
|
const currentConfig: Dictionary<string> = currentState.local.config;
|
||||||
for (const [key, value] of _.toPairs(currentConfig)) {
|
for (const [key, value] of _.toPairs(currentConfig)) {
|
||||||
let varValue = value;
|
let varValue = value;
|
||||||
|
19
src/application-manager.d.ts
vendored
19
src/application-manager.d.ts
vendored
@ -11,6 +11,7 @@ import { DeviceStatus, InstancedAppState } from './types/state';
|
|||||||
import ImageManager, { Image } from './compose/images';
|
import ImageManager, { Image } from './compose/images';
|
||||||
import ServiceManager from './compose/service-manager';
|
import ServiceManager from './compose/service-manager';
|
||||||
import DB from './db';
|
import DB from './db';
|
||||||
|
import DeviceState from './device-state';
|
||||||
|
|
||||||
import { APIBinder } from './api-binder';
|
import { APIBinder } from './api-binder';
|
||||||
import Config from './config';
|
import Config from './config';
|
||||||
@ -49,7 +50,7 @@ class ApplicationManager extends EventEmitter {
|
|||||||
// typecript, type the following
|
// typecript, type the following
|
||||||
public _lockingIfNecessary: any;
|
public _lockingIfNecessary: any;
|
||||||
public logger: Logger;
|
public logger: Logger;
|
||||||
public deviceState: any;
|
public deviceState: DeviceState;
|
||||||
public eventTracker: EventTracker;
|
public eventTracker: EventTracker;
|
||||||
public apiBinder: APIBinder;
|
public apiBinder: APIBinder;
|
||||||
public docker: DockerUtils;
|
public docker: DockerUtils;
|
||||||
@ -62,6 +63,22 @@ class ApplicationManager extends EventEmitter {
|
|||||||
public images: ImageManager;
|
public images: ImageManager;
|
||||||
|
|
||||||
public proxyvisor: any;
|
public proxyvisor: any;
|
||||||
|
public timeSpentFetching: number;
|
||||||
|
public fetchesInProgress: number;
|
||||||
|
|
||||||
|
public validActions: string[];
|
||||||
|
|
||||||
|
public router: Router;
|
||||||
|
|
||||||
|
public constructor({
|
||||||
|
logger: Logger,
|
||||||
|
config: Config,
|
||||||
|
db: DB,
|
||||||
|
eventTracker: EventTracker,
|
||||||
|
deviceState: DeviceState,
|
||||||
|
});
|
||||||
|
|
||||||
|
public init(): Promise<void>;
|
||||||
|
|
||||||
public getCurrentApp(appId: number): Bluebird<Application | null>;
|
public getCurrentApp(appId: number): Bluebird<Application | null>;
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import * as _ from 'lodash';
|
|||||||
|
|
||||||
import Config from '../config';
|
import Config from '../config';
|
||||||
|
|
||||||
import ApplicationManager from '../application-manager';
|
import ApplicationManager = require('../application-manager');
|
||||||
import Images, { Image } from './images';
|
import Images, { Image } from './images';
|
||||||
import Network from './network';
|
import Network from './network';
|
||||||
import Service from './service';
|
import Service from './service';
|
||||||
@ -95,9 +95,9 @@ interface CompositionStepArgs {
|
|||||||
ensureSupervisorNetwork: {};
|
ensureSupervisorNetwork: {};
|
||||||
}
|
}
|
||||||
|
|
||||||
type CompositionStepAction = keyof CompositionStepArgs;
|
export type CompositionStepAction = keyof CompositionStepArgs;
|
||||||
type CompositionStep<T extends CompositionStepAction> = {
|
export type CompositionStep<T extends CompositionStepAction> = {
|
||||||
step: T;
|
action: T;
|
||||||
} & CompositionStepArgs[T];
|
} & CompositionStepArgs[T];
|
||||||
|
|
||||||
export function generateStep<T extends CompositionStepAction>(
|
export function generateStep<T extends CompositionStepAction>(
|
||||||
@ -105,7 +105,7 @@ export function generateStep<T extends CompositionStepAction>(
|
|||||||
args: CompositionStepArgs[T],
|
args: CompositionStepArgs[T],
|
||||||
): CompositionStep<T> {
|
): CompositionStep<T> {
|
||||||
return {
|
return {
|
||||||
step: action,
|
action,
|
||||||
...args,
|
...args,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -410,6 +410,7 @@ export class Service {
|
|||||||
user: '',
|
user: '',
|
||||||
workingDir: '',
|
workingDir: '',
|
||||||
tty: true,
|
tty: true,
|
||||||
|
running: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Mutate service with extra features
|
// Mutate service with extra features
|
||||||
|
@ -54,7 +54,7 @@ export interface ServiceComposeConfig {
|
|||||||
image: string;
|
image: string;
|
||||||
init?: string | boolean;
|
init?: string | boolean;
|
||||||
labels?: { [labelName: string]: string };
|
labels?: { [labelName: string]: string };
|
||||||
running: boolean;
|
running?: boolean;
|
||||||
networkMode?: string;
|
networkMode?: string;
|
||||||
networks?: string[] | ServiceNetworkDictionary;
|
networks?: string[] | ServiceNetworkDictionary;
|
||||||
pid?: string;
|
pid?: string;
|
||||||
|
@ -3,7 +3,7 @@ import { NextFunction, Request, Response, Router } from 'express';
|
|||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
import { fs } from 'mz';
|
import { fs } from 'mz';
|
||||||
|
|
||||||
import { ApplicationManager } from '../application-manager';
|
import ApplicationManager = require('../application-manager');
|
||||||
import { Service } from '../compose/service';
|
import { Service } from '../compose/service';
|
||||||
import {
|
import {
|
||||||
appNotFoundMessage,
|
appNotFoundMessage,
|
||||||
|
@ -12,7 +12,7 @@ import { UnitNotLoadedError } from './lib/errors';
|
|||||||
import * as systemd from './lib/systemd';
|
import * as systemd from './lib/systemd';
|
||||||
import { EnvVarObject } from './lib/types';
|
import { EnvVarObject } from './lib/types';
|
||||||
import { checkInt, checkTruthy } from './lib/validation';
|
import { checkInt, checkTruthy } from './lib/validation';
|
||||||
import { DeviceApplicationState } from './types/state';
|
import { DeviceStatus } from './types/state';
|
||||||
|
|
||||||
const vpnServiceName = 'openvpn-resin';
|
const vpnServiceName = 'openvpn-resin';
|
||||||
|
|
||||||
@ -29,7 +29,9 @@ interface ConfigOption {
|
|||||||
rebootRequired?: boolean;
|
rebootRequired?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ConfigStep {
|
// FIXME: Bring this and the deviceState and
|
||||||
|
// applicationState steps together
|
||||||
|
export interface ConfigStep {
|
||||||
// TODO: This is a bit of a mess, the DeviceConfig class shouldn't
|
// TODO: This is a bit of a mess, the DeviceConfig class shouldn't
|
||||||
// know that the reboot action exists as it is implemented by
|
// know that the reboot action exists as it is implemented by
|
||||||
// DeviceState. Fix this weird circular dependency
|
// DeviceState. Fix this weird circular dependency
|
||||||
@ -367,8 +369,8 @@ export class DeviceConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async getRequiredSteps(
|
public async getRequiredSteps(
|
||||||
currentState: DeviceApplicationState,
|
currentState: DeviceStatus,
|
||||||
targetState: DeviceApplicationState,
|
targetState: { local?: { config?: Dictionary<string> } },
|
||||||
): Promise<ConfigStep[]> {
|
): Promise<ConfigStep[]> {
|
||||||
const current: Dictionary<string> = _.get(
|
const current: Dictionary<string> = _.get(
|
||||||
currentState,
|
currentState,
|
||||||
|
@ -1,512 +0,0 @@
|
|||||||
Promise = require 'bluebird'
|
|
||||||
_ = require 'lodash'
|
|
||||||
EventEmitter = require 'events'
|
|
||||||
express = require 'express'
|
|
||||||
bodyParser = require 'body-parser'
|
|
||||||
prettyMs = require 'pretty-ms'
|
|
||||||
hostConfig = require './host-config'
|
|
||||||
network = require './network'
|
|
||||||
|
|
||||||
constants = require './lib/constants'
|
|
||||||
validation = require './lib/validation'
|
|
||||||
systemd = require './lib/systemd'
|
|
||||||
updateLock = require './lib/update-lock'
|
|
||||||
{ loadTargetFromFile } = require './device-state/preload'
|
|
||||||
{ UpdatesLockedError } = require './lib/errors'
|
|
||||||
|
|
||||||
{ DeviceConfig } = require './device-config'
|
|
||||||
ApplicationManager = require './application-manager'
|
|
||||||
|
|
||||||
{ log } = require './lib/supervisor-console'
|
|
||||||
|
|
||||||
validateLocalState = (state) ->
|
|
||||||
if state.name?
|
|
||||||
throw new Error('Invalid device name') if not validation.isValidShortText(state.name)
|
|
||||||
if !state.apps? or !validation.isValidAppsObject(state.apps)
|
|
||||||
throw new Error('Invalid apps')
|
|
||||||
if !state.config? or !validation.isValidEnv(state.config)
|
|
||||||
throw new Error('Invalid device configuration')
|
|
||||||
|
|
||||||
validateDependentState = (state) ->
|
|
||||||
if state.apps? and !validation.isValidDependentAppsObject(state.apps)
|
|
||||||
throw new Error('Invalid dependent apps')
|
|
||||||
if state.devices? and !validation.isValidDependentDevicesObject(state.devices)
|
|
||||||
throw new Error('Invalid dependent devices')
|
|
||||||
|
|
||||||
validateState = Promise.method (state) ->
|
|
||||||
if !_.isObject(state)
|
|
||||||
throw new Error('State must be an object')
|
|
||||||
if !_.isObject(state.local)
|
|
||||||
throw new Error('Local state must be an object')
|
|
||||||
validateLocalState(state.local)
|
|
||||||
if state.dependent?
|
|
||||||
validateDependentState(state.dependent)
|
|
||||||
|
|
||||||
# TODO (refactor): This shouldn't be here, and instead should be part of the other
|
|
||||||
# device api stuff in ./device-api
|
|
||||||
createDeviceStateRouter = (deviceState) ->
|
|
||||||
router = express.Router()
|
|
||||||
router.use(bodyParser.urlencoded(limit: '10mb', extended: true))
|
|
||||||
router.use(bodyParser.json(limit: '10mb'))
|
|
||||||
|
|
||||||
rebootOrShutdown = (req, res, action) ->
|
|
||||||
deviceState.config.get('lockOverride')
|
|
||||||
.then (lockOverride) ->
|
|
||||||
force = validation.checkTruthy(req.body.force) or lockOverride
|
|
||||||
deviceState.executeStepAction({ action }, { force })
|
|
||||||
.then (response) ->
|
|
||||||
res.status(202).json(response)
|
|
||||||
.catch (err) ->
|
|
||||||
if err instanceof UpdatesLockedError
|
|
||||||
status = 423
|
|
||||||
else
|
|
||||||
status = 500
|
|
||||||
res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' })
|
|
||||||
|
|
||||||
router.post '/v1/reboot', (req, res) ->
|
|
||||||
rebootOrShutdown(req, res, 'reboot')
|
|
||||||
|
|
||||||
router.post '/v1/shutdown', (req, res) ->
|
|
||||||
rebootOrShutdown(req, res, 'shutdown')
|
|
||||||
|
|
||||||
router.get '/v1/device/host-config', (req, res) ->
|
|
||||||
hostConfig.get()
|
|
||||||
.then (conf) ->
|
|
||||||
res.json(conf)
|
|
||||||
.catch (err) ->
|
|
||||||
res.status(503).send(err?.message or err or 'Unknown error')
|
|
||||||
|
|
||||||
router.patch '/v1/device/host-config', (req, res) ->
|
|
||||||
hostConfig.patch(req.body, deviceState.config)
|
|
||||||
.then ->
|
|
||||||
res.status(200).send('OK')
|
|
||||||
.catch (err) ->
|
|
||||||
res.status(503).send(err?.message or err or 'Unknown error')
|
|
||||||
|
|
||||||
router.get '/v1/device', (req, res) ->
|
|
||||||
deviceState.getStatus()
|
|
||||||
.then (state) ->
|
|
||||||
stateToSend = _.pick(state.local, [
|
|
||||||
'api_port'
|
|
||||||
'ip_address'
|
|
||||||
'os_version'
|
|
||||||
'supervisor_version'
|
|
||||||
'update_pending'
|
|
||||||
'update_failed'
|
|
||||||
'update_downloaded'
|
|
||||||
])
|
|
||||||
if state.local.is_on__commit?
|
|
||||||
stateToSend.commit = state.local.is_on__commit
|
|
||||||
# Will produce nonsensical results for multicontainer apps...
|
|
||||||
service = _.toPairs(_.toPairs(state.local.apps)[0]?[1]?.services)[0]?[1]
|
|
||||||
if service?
|
|
||||||
stateToSend.status = service.status
|
|
||||||
# For backwards compatibility, we adapt Running to the old "Idle"
|
|
||||||
if stateToSend.status == 'Running'
|
|
||||||
stateToSend.status = 'Idle'
|
|
||||||
stateToSend.download_progress = service.download_progress
|
|
||||||
res.json(stateToSend)
|
|
||||||
.catch (err) ->
|
|
||||||
res.status(500).json({ Data: '', Error: err?.message or err or 'Unknown error' })
|
|
||||||
|
|
||||||
router.use(deviceState.applications.router)
|
|
||||||
return router
|
|
||||||
|
|
||||||
module.exports = class DeviceState extends EventEmitter
|
|
||||||
constructor: ({ @db, @config, @eventTracker, @logger }) ->
|
|
||||||
@deviceConfig = new DeviceConfig({ @db, @config, @logger })
|
|
||||||
@applications = new ApplicationManager({ @config, @logger, @db, @eventTracker, deviceState: this })
|
|
||||||
@on 'error', (err) ->
|
|
||||||
log.error('deviceState error: ', err)
|
|
||||||
@_currentVolatile = {}
|
|
||||||
@_writeLock = updateLock.writeLock
|
|
||||||
@_readLock = updateLock.readLock
|
|
||||||
@lastSuccessfulUpdate = null
|
|
||||||
@failedUpdates = 0
|
|
||||||
@applyInProgress = false
|
|
||||||
@applyCancelled = false
|
|
||||||
@cancelDelay = null
|
|
||||||
@lastApplyStart = process.hrtime()
|
|
||||||
@scheduledApply = null
|
|
||||||
@shuttingDown = false
|
|
||||||
@router = createDeviceStateRouter(this)
|
|
||||||
@on 'apply-target-state-end', (err) ->
|
|
||||||
if err?
|
|
||||||
if not (err instanceof UpdatesLockedError)
|
|
||||||
log.error('Device state apply error', err)
|
|
||||||
else
|
|
||||||
log.success('Device state apply success')
|
|
||||||
# We also let the device-config module know that we
|
|
||||||
# successfully reached the target state and that it
|
|
||||||
# should clear any rate limiting it's applied
|
|
||||||
@deviceConfig.resetRateLimits()
|
|
||||||
@applications.on('change', @reportCurrentState)
|
|
||||||
|
|
||||||
healthcheck: =>
|
|
||||||
@config.getMany([ 'unmanaged' ])
|
|
||||||
.then (conf) =>
|
|
||||||
cycleTime = process.hrtime(@lastApplyStart)
|
|
||||||
cycleTimeMs = cycleTime[0] * 1000 + cycleTime[1] / 1e6
|
|
||||||
cycleTimeWithinInterval = cycleTimeMs - @applications.timeSpentFetching < 2 * @maxPollTime
|
|
||||||
applyTargetHealthy = conf.unmanaged or !@applyInProgress or @applications.fetchesInProgress > 0 or cycleTimeWithinInterval
|
|
||||||
return applyTargetHealthy
|
|
||||||
|
|
||||||
init: ->
|
|
||||||
@config.on 'change', (changedConfig) =>
|
|
||||||
if changedConfig.loggingEnabled?
|
|
||||||
@logger.enable(changedConfig.loggingEnabled)
|
|
||||||
if changedConfig.apiSecret?
|
|
||||||
@reportCurrentState(api_secret: changedConfig.apiSecret)
|
|
||||||
if changedConfig.appUpdatePollInterval?
|
|
||||||
@maxPollTime = changedConfig.appUpdatePollInterval
|
|
||||||
|
|
||||||
@config.getMany([
|
|
||||||
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant',
|
|
||||||
'version', 'provisioned', 'apiEndpoint', 'connectivityCheckEnabled', 'legacyAppsPresent',
|
|
||||||
'targetStateSet', 'unmanaged', 'appUpdatePollInterval'
|
|
||||||
])
|
|
||||||
.then (conf) =>
|
|
||||||
@maxPollTime = conf.appUpdatePollInterval
|
|
||||||
@applications.init()
|
|
||||||
.then =>
|
|
||||||
if !conf.initialConfigSaved
|
|
||||||
@saveInitialConfig()
|
|
||||||
.then =>
|
|
||||||
@initNetworkChecks(conf)
|
|
||||||
log.info('Reporting initial state, supervisor version and API info')
|
|
||||||
@reportCurrentState(
|
|
||||||
api_port: conf.listenPort
|
|
||||||
api_secret: conf.apiSecret
|
|
||||||
os_version: conf.osVersion
|
|
||||||
os_variant: conf.osVariant
|
|
||||||
supervisor_version: conf.version
|
|
||||||
provisioning_progress: null
|
|
||||||
provisioning_state: ''
|
|
||||||
status: 'Idle'
|
|
||||||
logs_channel: null
|
|
||||||
update_failed: false
|
|
||||||
update_pending: false
|
|
||||||
update_downloaded: false
|
|
||||||
)
|
|
||||||
.then =>
|
|
||||||
@applications.getTargetApps()
|
|
||||||
.then (targetApps) =>
|
|
||||||
if !conf.provisioned or (_.isEmpty(targetApps) and !conf.targetStateSet)
|
|
||||||
loadTargetFromFile(null, this)
|
|
||||||
.finally =>
|
|
||||||
@config.set({ targetStateSet: 'true' })
|
|
||||||
else
|
|
||||||
log.debug('Skipping preloading')
|
|
||||||
if conf.provisioned and !_.isEmpty(targetApps)
|
|
||||||
# If we're in this case, it's because we've updated from an older supervisor
|
|
||||||
# and we need to mark that the target state has been set so that
|
|
||||||
# the supervisor doesn't try to preload again if in the future target
|
|
||||||
# apps are empty again (which may happen with multi-app).
|
|
||||||
@config.set({ targetStateSet: 'true' })
|
|
||||||
.then =>
|
|
||||||
@triggerApplyTarget({ initial: true })
|
|
||||||
|
|
||||||
initNetworkChecks: ({ apiEndpoint, connectivityCheckEnabled, unmanaged }) =>
|
|
||||||
network.startConnectivityCheck apiEndpoint, connectivityCheckEnabled, (connected) =>
|
|
||||||
@connected = connected
|
|
||||||
@config.on 'change', (changedConfig) ->
|
|
||||||
if changedConfig.connectivityCheckEnabled?
|
|
||||||
network.enableConnectivityCheck(changedConfig.connectivityCheckEnabled)
|
|
||||||
log.debug('Starting periodic check for IP addresses')
|
|
||||||
network.startIPAddressUpdate() (addresses) =>
|
|
||||||
@reportCurrentState(
|
|
||||||
ip_address: addresses.join(' ')
|
|
||||||
)
|
|
||||||
, constants.ipAddressUpdateInterval
|
|
||||||
|
|
||||||
saveInitialConfig: =>
|
|
||||||
@deviceConfig.getCurrent()
|
|
||||||
.then (devConf) =>
|
|
||||||
@deviceConfig.setTarget(devConf)
|
|
||||||
.then =>
|
|
||||||
@config.set({ initialConfigSaved: 'true' })
|
|
||||||
|
|
||||||
emitAsync: (ev, args...) =>
|
|
||||||
setImmediate => @emit(ev, args...)
|
|
||||||
|
|
||||||
_readLockTarget: =>
|
|
||||||
@_readLock('target').disposer (release) ->
|
|
||||||
release()
|
|
||||||
_writeLockTarget: =>
|
|
||||||
@_writeLock('target').disposer (release) ->
|
|
||||||
release()
|
|
||||||
_inferStepsLock: =>
|
|
||||||
@_writeLock('inferSteps').disposer (release) ->
|
|
||||||
release()
|
|
||||||
|
|
||||||
usingReadLockTarget: (fn) =>
|
|
||||||
Promise.using @_readLockTarget, -> fn()
|
|
||||||
usingWriteLockTarget: (fn) =>
|
|
||||||
Promise.using @_writeLockTarget, -> fn()
|
|
||||||
usingInferStepsLock: (fn) =>
|
|
||||||
Promise.using @_inferStepsLock, -> fn()
|
|
||||||
|
|
||||||
setTarget: (target, localSource = false) ->
|
|
||||||
# When we get a new target state, clear any built up apply errors
|
|
||||||
# This means that we can attempt to apply the new state instantly
|
|
||||||
@failedUpdates = 0
|
|
||||||
|
|
||||||
Promise.join(
|
|
||||||
@config.get('apiEndpoint'),
|
|
||||||
validateState(target),
|
|
||||||
(apiEndpoint) =>
|
|
||||||
@usingWriteLockTarget =>
|
|
||||||
# Apps, deviceConfig, dependent
|
|
||||||
@db.transaction (trx) =>
|
|
||||||
Promise.try =>
|
|
||||||
@config.set({ name: target.local.name }, trx)
|
|
||||||
.then =>
|
|
||||||
@deviceConfig.setTarget(target.local.config, trx)
|
|
||||||
.then =>
|
|
||||||
if localSource or not apiEndpoint
|
|
||||||
@applications.setTarget(target.local.apps, target.dependent, 'local', trx)
|
|
||||||
else
|
|
||||||
@applications.setTarget(target.local.apps, target.dependent, apiEndpoint, trx)
|
|
||||||
)
|
|
||||||
|
|
||||||
getTarget: ({ initial = false, intermediate = false } = {}) =>
|
|
||||||
@usingReadLockTarget =>
|
|
||||||
if intermediate
|
|
||||||
return @intermediateTarget
|
|
||||||
Promise.props({
|
|
||||||
local: Promise.props({
|
|
||||||
name: @config.get('name')
|
|
||||||
config: @deviceConfig.getTarget({ initial })
|
|
||||||
apps: @applications.getTargetApps()
|
|
||||||
})
|
|
||||||
dependent: @applications.getDependentTargets()
|
|
||||||
})
|
|
||||||
|
|
||||||
getStatus: ->
|
|
||||||
@applications.getStatus()
|
|
||||||
.then (appsStatus) =>
|
|
||||||
theState = { local: {}, dependent: {} }
|
|
||||||
_.merge(theState.local, @_currentVolatile)
|
|
||||||
theState.local.apps = appsStatus.local
|
|
||||||
theState.dependent.apps = appsStatus.dependent
|
|
||||||
if appsStatus.commit and !@applyInProgress
|
|
||||||
theState.local.is_on__commit = appsStatus.commit
|
|
||||||
return theState
|
|
||||||
|
|
||||||
getCurrentForComparison: ->
|
|
||||||
Promise.join(
|
|
||||||
@config.get('name')
|
|
||||||
@deviceConfig.getCurrent()
|
|
||||||
@applications.getCurrentForComparison()
|
|
||||||
@applications.getDependentState()
|
|
||||||
(name, devConfig, apps, dependent) ->
|
|
||||||
return {
|
|
||||||
local: {
|
|
||||||
name
|
|
||||||
config: devConfig
|
|
||||||
apps
|
|
||||||
}
|
|
||||||
dependent
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
reportCurrentState: (newState = {}) =>
|
|
||||||
_.assign(@_currentVolatile, newState)
|
|
||||||
@emitAsync('change')
|
|
||||||
|
|
||||||
reboot: (force, skipLock) =>
|
|
||||||
@applications.stopAll({ force, skipLock })
|
|
||||||
.then =>
|
|
||||||
@logger.logSystemMessage('Rebooting', {}, 'Reboot')
|
|
||||||
systemd.reboot()
|
|
||||||
.tap =>
|
|
||||||
@shuttingDown = true
|
|
||||||
@emitAsync('shutdown')
|
|
||||||
|
|
||||||
shutdown: (force, skipLock) =>
|
|
||||||
@applications.stopAll({ force, skipLock })
|
|
||||||
.then =>
|
|
||||||
@logger.logSystemMessage('Shutting down', {}, 'Shutdown')
|
|
||||||
systemd.shutdown()
|
|
||||||
.tap =>
|
|
||||||
@shuttingDown = true
|
|
||||||
@emitAsync('shutdown')
|
|
||||||
|
|
||||||
executeStepAction: (step, { force, initial, skipLock }) =>
|
|
||||||
Promise.try =>
|
|
||||||
if @deviceConfig.isValidAction(step.action)
|
|
||||||
@deviceConfig.executeStepAction(step, { initial })
|
|
||||||
else if _.includes(@applications.validActions, step.action)
|
|
||||||
@applications.executeStepAction(step, { force, skipLock })
|
|
||||||
else
|
|
||||||
switch step.action
|
|
||||||
when 'reboot'
|
|
||||||
# There isn't really a way that these methods can fail,
|
|
||||||
# and if they do, we wouldn't know about it until after
|
|
||||||
# the response has been sent back to the API. Just return
|
|
||||||
# "OK" for this and the below action
|
|
||||||
@reboot(force, skipLock).return(Data: 'OK', Error: null)
|
|
||||||
when 'shutdown'
|
|
||||||
@shutdown(force, skipLock).return(Data: 'OK', Error: null)
|
|
||||||
when 'noop'
|
|
||||||
Promise.resolve()
|
|
||||||
else
|
|
||||||
throw new Error("Invalid action #{step.action}")
|
|
||||||
|
|
||||||
applyStep: (step, { force, initial, intermediate, skipLock }) =>
|
|
||||||
if @shuttingDown
|
|
||||||
return
|
|
||||||
@executeStepAction(step, { force, initial, skipLock })
|
|
||||||
.tapCatch (err) =>
|
|
||||||
@emitAsync('step-error', err, step)
|
|
||||||
.then (stepResult) =>
|
|
||||||
@emitAsync('step-completed', null, step, stepResult)
|
|
||||||
|
|
||||||
applyError: (err, { force, initial, intermediate }) =>
|
|
||||||
@emitAsync('apply-target-state-error', err)
|
|
||||||
@emitAsync('apply-target-state-end', err)
|
|
||||||
if intermediate
|
|
||||||
throw err
|
|
||||||
@failedUpdates += 1
|
|
||||||
@reportCurrentState(update_failed: true)
|
|
||||||
if @scheduledApply?
|
|
||||||
if not (err instanceof UpdatesLockedError)
|
|
||||||
log.error("Updating failed, but there's another update scheduled immediately: ", err)
|
|
||||||
else
|
|
||||||
delay = Math.min((2 ** @failedUpdates) * constants.backoffIncrement, @maxPollTime)
|
|
||||||
# If there was an error then schedule another attempt briefly in the future.
|
|
||||||
if err instanceof UpdatesLockedError
|
|
||||||
message = "Updates are locked, retrying in #{prettyMs(delay, compact: true)}..."
|
|
||||||
@logger.logSystemMessage(message, {}, 'updateLocked', false)
|
|
||||||
log.info(message)
|
|
||||||
else
|
|
||||||
log.error("Scheduling another update attempt in #{delay}ms due to failure: ", err)
|
|
||||||
@triggerApplyTarget({ force, delay, initial })
|
|
||||||
|
|
||||||
applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false, nextDelay = 200, retryCount = 0 } = {}) =>
|
|
||||||
Promise.try =>
|
|
||||||
if !intermediate
|
|
||||||
@applyBlocker
|
|
||||||
.then =>
|
|
||||||
@applications.localModeSwitchCompletion()
|
|
||||||
.then =>
|
|
||||||
@usingInferStepsLock =>
|
|
||||||
Promise.all([
|
|
||||||
@getCurrentForComparison()
|
|
||||||
@getTarget({ initial, intermediate })
|
|
||||||
])
|
|
||||||
.then ([ currentState, targetState ]) =>
|
|
||||||
@applications.getExtraStateForComparison(currentState, targetState)
|
|
||||||
.then (extraState) =>
|
|
||||||
@deviceConfig.getRequiredSteps(currentState, targetState)
|
|
||||||
.then (deviceConfigSteps) =>
|
|
||||||
noConfigSteps = _.every(deviceConfigSteps, ({ action }) -> action is 'noop')
|
|
||||||
if not noConfigSteps
|
|
||||||
return [false, deviceConfigSteps]
|
|
||||||
else
|
|
||||||
@applications.getRequiredSteps(currentState, targetState, extraState, intermediate)
|
|
||||||
.then (appSteps) ->
|
|
||||||
# We need to forward to no-ops to the next step if the application state is done
|
|
||||||
# The true and false values below represent whether we should add an exponential
|
|
||||||
# backoff if the steps are all no-ops. the reason that this has to be different
|
|
||||||
# is that a noop step from the application manager means that we should keep running
|
|
||||||
# in a tight loop, waiting for an image to download (for example). The device-config
|
|
||||||
# steps generally have a much higher time to wait before the no-ops will stop, so we
|
|
||||||
# should try to reduce the effect that this will have
|
|
||||||
if _.isEmpty(appSteps) and noConfigSteps
|
|
||||||
return [true, deviceConfigSteps]
|
|
||||||
return [false, appSteps]
|
|
||||||
.then ([backoff, steps]) =>
|
|
||||||
if _.isEmpty(steps)
|
|
||||||
@emitAsync('apply-target-state-end', null)
|
|
||||||
if !intermediate
|
|
||||||
log.debug('Finished applying target state')
|
|
||||||
@applications.timeSpentFetching = 0
|
|
||||||
@failedUpdates = 0
|
|
||||||
@lastSuccessfulUpdate = Date.now()
|
|
||||||
@reportCurrentState(update_failed: false, update_pending: false, update_downloaded: false)
|
|
||||||
return
|
|
||||||
if !intermediate
|
|
||||||
@reportCurrentState(update_pending: true)
|
|
||||||
if _.every(steps, (step) -> step.action == 'noop')
|
|
||||||
if backoff
|
|
||||||
retryCount += 1
|
|
||||||
# Backoff to a maximum of 10 minutes
|
|
||||||
nextDelay = Math.min(2 ** retryCount * 1000, 60 * 10 * 1000)
|
|
||||||
else
|
|
||||||
nextDelay = 1000
|
|
||||||
Promise.map steps, (step) =>
|
|
||||||
@applyStep(step, { force, initial, intermediate, skipLock })
|
|
||||||
.delay(nextDelay)
|
|
||||||
.then =>
|
|
||||||
@applyTarget({ force, initial, intermediate, skipLock, nextDelay, retryCount })
|
|
||||||
.catch (err) =>
|
|
||||||
detailedError = new Error('Failed to apply state transition steps. ' + err.message + ' Steps:' + JSON.stringify(_.map(steps, 'action')))
|
|
||||||
@applyError(detailedError, { force, initial, intermediate })
|
|
||||||
.catch (err) =>
|
|
||||||
@applyError(err, { force, initial, intermediate })
|
|
||||||
|
|
||||||
pausingApply: (fn) =>
|
|
||||||
lock = =>
|
|
||||||
@_writeLock('pause').disposer (release) ->
|
|
||||||
release()
|
|
||||||
pause = =>
|
|
||||||
Promise.try =>
|
|
||||||
res = null
|
|
||||||
@applyBlocker = new Promise (resolve) ->
|
|
||||||
res = resolve
|
|
||||||
return res
|
|
||||||
.disposer (resolve) ->
|
|
||||||
resolve()
|
|
||||||
|
|
||||||
Promise.using lock(), ->
|
|
||||||
Promise.using pause(), ->
|
|
||||||
fn()
|
|
||||||
|
|
||||||
resumeNextApply: =>
|
|
||||||
@applyUnblocker?()
|
|
||||||
return
|
|
||||||
|
|
||||||
triggerApplyTarget: ({ force = false, delay = 0, initial = false, isFromApi = false } = {}) =>
|
|
||||||
if @applyInProgress
|
|
||||||
if !@scheduledApply? || (isFromApi && @cancelDelay)
|
|
||||||
@scheduledApply = { force, delay }
|
|
||||||
if isFromApi
|
|
||||||
# Cancel promise delay if call came from api to
|
|
||||||
# prevent waiting due to backoff (and if we've
|
|
||||||
# previously setup a delay)
|
|
||||||
@cancelDelay?()
|
|
||||||
else
|
|
||||||
# If a delay has been set it's because we need to hold off before applying again,
|
|
||||||
# so we need to respect the maximum delay that has been passed
|
|
||||||
@scheduledApply.delay = Math.max(delay, @scheduledApply.delay)
|
|
||||||
@scheduledApply.force or= force
|
|
||||||
return
|
|
||||||
@applyCancelled = false
|
|
||||||
@applyInProgress = true
|
|
||||||
new Promise (resolve, reject) =>
|
|
||||||
setTimeout(resolve, delay)
|
|
||||||
@cancelDelay = reject
|
|
||||||
.catch =>
|
|
||||||
@applyCancelled = true
|
|
||||||
.then =>
|
|
||||||
@cancelDelay = null
|
|
||||||
if @applyCancelled
|
|
||||||
log.info('Skipping applyTarget because of a cancellation')
|
|
||||||
return
|
|
||||||
@lastApplyStart = process.hrtime()
|
|
||||||
log.info('Applying target state')
|
|
||||||
@applyTarget({ force, initial })
|
|
||||||
.finally =>
|
|
||||||
@applyInProgress = false
|
|
||||||
@reportCurrentState()
|
|
||||||
if @scheduledApply?
|
|
||||||
@triggerApplyTarget(@scheduledApply)
|
|
||||||
@scheduledApply = null
|
|
||||||
return null
|
|
||||||
|
|
||||||
applyIntermediateTarget: (intermediateTarget, { force = false, skipLock = false } = {}) =>
|
|
||||||
@intermediateTarget = _.cloneDeep(intermediateTarget)
|
|
||||||
@applyTarget({ intermediate: true, force, skipLock })
|
|
||||||
.then =>
|
|
||||||
@intermediateTarget = null
|
|
43
src/device-state.d.ts
vendored
43
src/device-state.d.ts
vendored
@ -1,43 +0,0 @@
|
|||||||
import { EventEmitter } from 'events';
|
|
||||||
import { Router } from 'express';
|
|
||||||
|
|
||||||
import ApplicationManager from './application-manager';
|
|
||||||
import Config from './config';
|
|
||||||
import Database from './db';
|
|
||||||
import DeviceConfig from './device-config';
|
|
||||||
import EventTracker from './event-tracker';
|
|
||||||
import Logger from './logger';
|
|
||||||
|
|
||||||
// This is a very incomplete definition of the device state
|
|
||||||
// class, which should be rewritten in typescript soon
|
|
||||||
class DeviceState extends EventEmitter {
|
|
||||||
public applications: ApplicationManager;
|
|
||||||
public router: Router;
|
|
||||||
public deviceConfig: DeviceConfig;
|
|
||||||
public config: Config;
|
|
||||||
public eventTracker: EventTracker;
|
|
||||||
|
|
||||||
// FIXME: I should be removed once device-state is refactored
|
|
||||||
public connected: boolean;
|
|
||||||
|
|
||||||
public constructor(args: {
|
|
||||||
config: Config;
|
|
||||||
db: Database;
|
|
||||||
eventTracker: EventTracker;
|
|
||||||
logger: Logger;
|
|
||||||
});
|
|
||||||
|
|
||||||
public healthcheck(): Promise<void>;
|
|
||||||
public normaliseLegacy(client: PinejsClientRequest): Promise<void>;
|
|
||||||
public loadTargetFromFile(filename: string): Promise<void>;
|
|
||||||
public getTarget(): Promise<any>;
|
|
||||||
public setTarget(target: any): Promise<any>;
|
|
||||||
public triggerApplyTarget(opts: any): Promise<any>;
|
|
||||||
public reportCurrentState(state: any);
|
|
||||||
public getCurrentForComparison(): Promise<any>;
|
|
||||||
public getStatus(): Promise<any>;
|
|
||||||
|
|
||||||
public async init();
|
|
||||||
}
|
|
||||||
|
|
||||||
export = DeviceState;
|
|
882
src/device-state.ts
Normal file
882
src/device-state.ts
Normal file
@ -0,0 +1,882 @@
|
|||||||
|
import * as Bluebird from 'bluebird';
|
||||||
|
import * as bodyParser from 'body-parser';
|
||||||
|
import { EventEmitter } from 'events';
|
||||||
|
import * as express from 'express';
|
||||||
|
import * as _ from 'lodash';
|
||||||
|
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||||
|
|
||||||
|
import prettyMs = require('pretty-ms');
|
||||||
|
|
||||||
|
import Config, { ConfigType } from './config';
|
||||||
|
import Database from './db';
|
||||||
|
import EventTracker from './event-tracker';
|
||||||
|
import Logger from './logger';
|
||||||
|
|
||||||
|
import {
|
||||||
|
CompositionStep,
|
||||||
|
CompositionStepAction,
|
||||||
|
} from './compose/composition-steps';
|
||||||
|
import { loadTargetFromFile } from './device-state/preload';
|
||||||
|
import * as hostConfig from './host-config';
|
||||||
|
import constants = require('./lib/constants');
|
||||||
|
import { InternalInconsistencyError, UpdatesLockedError } from './lib/errors';
|
||||||
|
import * as systemd from './lib/systemd';
|
||||||
|
import * as updateLock from './lib/update-lock';
|
||||||
|
import * as validation from './lib/validation';
|
||||||
|
import * as network from './network';
|
||||||
|
|
||||||
|
import ApplicationManager = require('./application-manager');
|
||||||
|
import DeviceConfig, { ConfigStep } from './device-config';
|
||||||
|
import { log } from './lib/supervisor-console';
|
||||||
|
import {
|
||||||
|
DeviceReportFields,
|
||||||
|
DeviceStatus,
|
||||||
|
InstancedDeviceState,
|
||||||
|
TargetState,
|
||||||
|
} from './types/state';
|
||||||
|
|
||||||
|
function validateLocalState(state: any): asserts state is TargetState['local'] {
|
||||||
|
if (state.name != null) {
|
||||||
|
if (!validation.isValidShortText(state.name)) {
|
||||||
|
throw new Error('Invalid device name');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (state.apps == null || !validation.isValidAppsObject(state.apps)) {
|
||||||
|
throw new Error('Invalid apps');
|
||||||
|
}
|
||||||
|
if (state.config == null || !validation.isValidEnv(state.config)) {
|
||||||
|
throw new Error('Invalid device configuration');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function validateDependentState(
|
||||||
|
state: any,
|
||||||
|
): asserts state is TargetState['dependent'] {
|
||||||
|
if (
|
||||||
|
state.apps != null &&
|
||||||
|
!validation.isValidDependentAppsObject(state.apps)
|
||||||
|
) {
|
||||||
|
throw new Error('Invalid dependent apps');
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
state.devices != null &&
|
||||||
|
!validation.isValidDependentDevicesObject(state.devices)
|
||||||
|
) {
|
||||||
|
throw new Error('Invalid dependent devices');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function validateState(state: any): asserts state is TargetState {
|
||||||
|
if (!_.isObject(state)) {
|
||||||
|
throw new Error('State must be an object');
|
||||||
|
}
|
||||||
|
if (!_.isObject(state.local)) {
|
||||||
|
throw new Error('Local state must be an object');
|
||||||
|
}
|
||||||
|
validateLocalState(state.local);
|
||||||
|
if (state.dependent != null) {
|
||||||
|
return validateDependentState(state.dependent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO (refactor): This shouldn't be here, and instead should be part of the other
|
||||||
|
// device api stuff in ./device-api
|
||||||
|
function createDeviceStateRouter(deviceState: DeviceState) {
|
||||||
|
const router = express.Router();
|
||||||
|
router.use(bodyParser.urlencoded({ limit: '10mb', extended: true }));
|
||||||
|
router.use(bodyParser.json({ limit: '10mb' }));
|
||||||
|
|
||||||
|
const rebootOrShutdown = async (
|
||||||
|
req: express.Request,
|
||||||
|
res: express.Response,
|
||||||
|
action: DeviceStateStepTarget,
|
||||||
|
) => {
|
||||||
|
const override = await deviceState.config.get('lockOverride');
|
||||||
|
const force = validation.checkTruthy(req.body.force) || override;
|
||||||
|
try {
|
||||||
|
const response = await deviceState.executeStepAction(
|
||||||
|
{ action },
|
||||||
|
{ force },
|
||||||
|
);
|
||||||
|
res.status(202).json(response);
|
||||||
|
} catch (e) {
|
||||||
|
const status = e instanceof UpdatesLockedError ? 423 : 500;
|
||||||
|
res.status(status).json({
|
||||||
|
Data: '',
|
||||||
|
Error: (e != null ? e.message : undefined) || e || 'Unknown error',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
router.post('/v1/reboot', (req, res) => rebootOrShutdown(req, res, 'reboot'));
|
||||||
|
router.post('/v1/shutdown', (req, res) =>
|
||||||
|
rebootOrShutdown(req, res, 'shutdown'),
|
||||||
|
);
|
||||||
|
|
||||||
|
router.get('/v1/device/host-config', (_req, res) =>
|
||||||
|
hostConfig
|
||||||
|
.get()
|
||||||
|
.then(conf => res.json(conf))
|
||||||
|
.catch(err =>
|
||||||
|
res.status(503).send(err?.message ?? err ?? 'Unknown error'),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
router.patch('/v1/device/host-config', (req, res) =>
|
||||||
|
hostConfig
|
||||||
|
.patch(req.body, deviceState.config)
|
||||||
|
.then(() => res.status(200).send('OK'))
|
||||||
|
.catch(err =>
|
||||||
|
res.status(503).send(err?.message ?? err ?? 'Unknown error'),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
router.get('/v1/device', async (_req, res) => {
|
||||||
|
try {
|
||||||
|
const state = await deviceState.getStatus();
|
||||||
|
const stateToSend = _.pick(state.local, [
|
||||||
|
'api_port',
|
||||||
|
'ip_address',
|
||||||
|
'os_version',
|
||||||
|
'supervisor_version',
|
||||||
|
'update_pending',
|
||||||
|
'update_failed',
|
||||||
|
'update_downloaded',
|
||||||
|
]) as Dictionary<unknown>;
|
||||||
|
if (state.local?.is_on__commit != null) {
|
||||||
|
stateToSend.commit = state.local.is_on__commit;
|
||||||
|
}
|
||||||
|
const service = _.toPairs(
|
||||||
|
_.toPairs(state.local?.apps)[0]?.[1]?.services,
|
||||||
|
)[0]?.[1];
|
||||||
|
|
||||||
|
if (service != null) {
|
||||||
|
stateToSend.status = service.status;
|
||||||
|
if (stateToSend.status === 'Running') {
|
||||||
|
stateToSend.status = 'Idle';
|
||||||
|
}
|
||||||
|
stateToSend.download_progress = service.download_progress;
|
||||||
|
}
|
||||||
|
res.json(stateToSend);
|
||||||
|
} catch (e) {
|
||||||
|
res.status(500).json({
|
||||||
|
Data: '',
|
||||||
|
Error: (e != null ? e.message : undefined) || e || 'Unknown error',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
router.use(deviceState.applications.router);
|
||||||
|
return router;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface DeviceStateConstructOpts {
|
||||||
|
db: Database;
|
||||||
|
config: Config;
|
||||||
|
eventTracker: EventTracker;
|
||||||
|
logger: Logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface DeviceStateEvents {
|
||||||
|
error: Error;
|
||||||
|
change: void;
|
||||||
|
shutdown: void;
|
||||||
|
'apply-target-state-end': Nullable<Error>;
|
||||||
|
'apply-target-state-error': Error;
|
||||||
|
|
||||||
|
'step-error': (
|
||||||
|
err: Nullable<Error>,
|
||||||
|
step: DeviceStateStep<PossibleStepTargets>,
|
||||||
|
) => void;
|
||||||
|
|
||||||
|
'step-completed': (
|
||||||
|
err: Nullable<Error>,
|
||||||
|
step: DeviceStateStep<PossibleStepTargets>,
|
||||||
|
result?: { Data: string; Error: Nullable<Error> },
|
||||||
|
) => void;
|
||||||
|
}
|
||||||
|
type DeviceStateEventEmitter = StrictEventEmitter<
|
||||||
|
EventEmitter,
|
||||||
|
DeviceStateEvents
|
||||||
|
>;
|
||||||
|
|
||||||
|
type DeviceStateStepTarget = 'reboot' | 'shutdown' | 'noop';
|
||||||
|
|
||||||
|
type PossibleStepTargets = CompositionStepAction | DeviceStateStepTarget;
|
||||||
|
type DeviceStateStep<T extends PossibleStepTargets> =
|
||||||
|
| {
|
||||||
|
action: 'reboot';
|
||||||
|
}
|
||||||
|
| { action: 'shutdown' }
|
||||||
|
| { action: 'noop' }
|
||||||
|
| CompositionStep<T extends CompositionStepAction ? T : never>
|
||||||
|
| ConfigStep;
|
||||||
|
|
||||||
|
export class DeviceState extends (EventEmitter as new () => DeviceStateEventEmitter) {
|
||||||
|
public db: Database;
|
||||||
|
public config: Config;
|
||||||
|
public eventTracker: EventTracker;
|
||||||
|
public logger: Logger;
|
||||||
|
|
||||||
|
public applications: ApplicationManager;
|
||||||
|
public deviceConfig: DeviceConfig;
|
||||||
|
|
||||||
|
private currentVolatile: DeviceReportFields = {};
|
||||||
|
private writeLock = updateLock.writeLock;
|
||||||
|
private readLock = updateLock.readLock;
|
||||||
|
private cancelDelay: null | (() => void) = null;
|
||||||
|
private maxPollTime: number;
|
||||||
|
private intermediateTarget: TargetState | null = null;
|
||||||
|
private applyBlocker: Nullable<Promise<void>>;
|
||||||
|
|
||||||
|
public lastSuccessfulUpdate: number | null = null;
|
||||||
|
public failedUpdates: number = 0;
|
||||||
|
public applyInProgress = false;
|
||||||
|
public applyCancelled = false;
|
||||||
|
public lastApplyStart = process.hrtime();
|
||||||
|
public scheduledApply: { force?: boolean; delay?: number } | null = null;
|
||||||
|
public shuttingDown = false;
|
||||||
|
public connected: boolean;
|
||||||
|
public router: express.Router;
|
||||||
|
|
||||||
|
constructor({ db, config, eventTracker, logger }: DeviceStateConstructOpts) {
|
||||||
|
super();
|
||||||
|
this.db = db;
|
||||||
|
this.config = config;
|
||||||
|
this.eventTracker = eventTracker;
|
||||||
|
this.logger = logger;
|
||||||
|
this.deviceConfig = new DeviceConfig({
|
||||||
|
db: this.db,
|
||||||
|
config: this.config,
|
||||||
|
logger: this.logger,
|
||||||
|
});
|
||||||
|
this.applications = new ApplicationManager({
|
||||||
|
config: this.config,
|
||||||
|
logger: this.logger,
|
||||||
|
db: this.db,
|
||||||
|
eventTracker: this.eventTracker,
|
||||||
|
deviceState: this,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.on('error', err => log.error('deviceState error: ', err));
|
||||||
|
this.on('apply-target-state-end', function(err) {
|
||||||
|
if (err != null) {
|
||||||
|
if (!(err instanceof UpdatesLockedError)) {
|
||||||
|
return log.error('Device state apply error', err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.success('Device state apply success');
|
||||||
|
// We also let the device-config module know that we
|
||||||
|
// successfully reached the target state and that it
|
||||||
|
// should clear any rate limiting it's applied
|
||||||
|
return this.deviceConfig.resetRateLimits();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.applications.on('change', d => this.reportCurrentState(d));
|
||||||
|
this.router = createDeviceStateRouter(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async healthcheck() {
|
||||||
|
const unmanaged = await this.config.get('unmanaged');
|
||||||
|
const cycleTime = process.hrtime(this.lastApplyStart);
|
||||||
|
const cycleTimeMs = cycleTime[0] * 1000 + cycleTime[1] / 1e6;
|
||||||
|
|
||||||
|
const cycleTimeWithinInterval =
|
||||||
|
cycleTimeMs - this.applications.timeSpentFetching < 2 * this.maxPollTime;
|
||||||
|
|
||||||
|
const applyTargetHealthy =
|
||||||
|
unmanaged ||
|
||||||
|
!this.applyInProgress ||
|
||||||
|
this.applications.fetchesInProgress > 0 ||
|
||||||
|
cycleTimeWithinInterval;
|
||||||
|
|
||||||
|
return applyTargetHealthy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async init() {
|
||||||
|
this.config.on('change', changedConfig => {
|
||||||
|
if (changedConfig.loggingEnabled != null) {
|
||||||
|
this.logger.enable(changedConfig.loggingEnabled);
|
||||||
|
}
|
||||||
|
if (changedConfig.apiSecret != null) {
|
||||||
|
this.reportCurrentState({ api_secret: changedConfig.apiSecret });
|
||||||
|
}
|
||||||
|
if (changedConfig.appUpdatePollInterval != null) {
|
||||||
|
this.maxPollTime = changedConfig.appUpdatePollInterval;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const conf = await this.config.getMany([
|
||||||
|
'initialConfigSaved',
|
||||||
|
'listenPort',
|
||||||
|
'apiSecret',
|
||||||
|
'osVersion',
|
||||||
|
'osVariant',
|
||||||
|
'version',
|
||||||
|
'provisioned',
|
||||||
|
'apiEndpoint',
|
||||||
|
'connectivityCheckEnabled',
|
||||||
|
'legacyAppsPresent',
|
||||||
|
'targetStateSet',
|
||||||
|
'unmanaged',
|
||||||
|
'appUpdatePollInterval',
|
||||||
|
]);
|
||||||
|
this.maxPollTime = conf.appUpdatePollInterval;
|
||||||
|
|
||||||
|
await this.applications.init();
|
||||||
|
if (!conf.initialConfigSaved) {
|
||||||
|
return this.saveInitialConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.initNetworkChecks(conf);
|
||||||
|
|
||||||
|
log.info('Reporting initial state, supervisor version and API info');
|
||||||
|
await this.reportCurrentState({
|
||||||
|
api_port: conf.listenPort,
|
||||||
|
api_secret: conf.apiSecret,
|
||||||
|
os_version: conf.osVersion,
|
||||||
|
os_variant: conf.osVariant,
|
||||||
|
supervisor_version: conf.version,
|
||||||
|
provisioning_progress: null,
|
||||||
|
provisioning_state: '',
|
||||||
|
status: 'Idle',
|
||||||
|
logs_channel: null,
|
||||||
|
update_failed: false,
|
||||||
|
update_pending: false,
|
||||||
|
update_downloaded: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const targetApps = await this.applications.getTargetApps();
|
||||||
|
if (!conf.provisioned || (_.isEmpty(targetApps) && !conf.targetStateSet)) {
|
||||||
|
try {
|
||||||
|
await loadTargetFromFile(null, this);
|
||||||
|
} finally {
|
||||||
|
await this.config.set({ targetStateSet: true });
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.debug('Skipping preloading');
|
||||||
|
if (conf.provisioned && !_.isEmpty(targetApps)) {
|
||||||
|
// If we're in this case, it's because we've updated from an older supervisor
|
||||||
|
// and we need to mark that the target state has been set so that
|
||||||
|
// the supervisor doesn't try to preload again if in the future target
|
||||||
|
// apps are empty again (which may happen with multi-app).
|
||||||
|
await this.config.set({ targetStateSet: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await this.triggerApplyTarget({ initial: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
public async initNetworkChecks({
|
||||||
|
apiEndpoint,
|
||||||
|
connectivityCheckEnabled,
|
||||||
|
}: {
|
||||||
|
apiEndpoint: ConfigType<'apiEndpoint'>;
|
||||||
|
connectivityCheckEnabled: ConfigType<'connectivityCheckEnabled'>;
|
||||||
|
}) {
|
||||||
|
network.startConnectivityCheck(
|
||||||
|
apiEndpoint,
|
||||||
|
connectivityCheckEnabled,
|
||||||
|
connected => {
|
||||||
|
return (this.connected = connected);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
this.config.on('change', function(changedConfig) {
|
||||||
|
if (changedConfig.connectivityCheckEnabled != null) {
|
||||||
|
return network.enableConnectivityCheck(
|
||||||
|
changedConfig.connectivityCheckEnabled,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
log.debug('Starting periodic check for IP addresses');
|
||||||
|
|
||||||
|
await network.startIPAddressUpdate()(async addresses => {
|
||||||
|
await this.reportCurrentState({
|
||||||
|
ip_address: addresses.join(' '),
|
||||||
|
});
|
||||||
|
}, constants.ipAddressUpdateInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async saveInitialConfig() {
|
||||||
|
const devConf = await this.deviceConfig.getCurrent();
|
||||||
|
|
||||||
|
await this.deviceConfig.setTarget(devConf);
|
||||||
|
await this.config.set({ initialConfigSaved: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
// We keep compatibility with the StrictEventEmitter types
|
||||||
|
// from the outside, but within this function all hells
|
||||||
|
// breaks loose due to the liberal any casting
|
||||||
|
private emitAsync<T extends keyof DeviceStateEvents>(
|
||||||
|
ev: T,
|
||||||
|
...args: DeviceStateEvents[T] extends (...args: any) => void
|
||||||
|
? Parameters<DeviceStateEvents[T]>
|
||||||
|
: Array<DeviceStateEvents[T]>
|
||||||
|
) {
|
||||||
|
if (_.isArray(args)) {
|
||||||
|
return setImmediate(() => this.emit(ev as any, ...args));
|
||||||
|
} else {
|
||||||
|
return setImmediate(() => this.emit(ev as any, args));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private readLockTarget = () =>
|
||||||
|
this.readLock('target').disposer(release => release());
|
||||||
|
private writeLockTarget = () =>
|
||||||
|
this.writeLock('target').disposer(release => release());
|
||||||
|
private inferStepsLock = () =>
|
||||||
|
this.writeLock('inferSteps').disposer(release => release());
|
||||||
|
private usingReadLockTarget(fn: () => any) {
|
||||||
|
return Bluebird.using(this.readLockTarget, () => fn());
|
||||||
|
}
|
||||||
|
private usingWriteLockTarget(fn: () => any) {
|
||||||
|
return Bluebird.using(this.writeLockTarget, () => fn());
|
||||||
|
}
|
||||||
|
private usingInferStepsLock(fn: () => any) {
|
||||||
|
return Bluebird.using(this.inferStepsLock, () => fn());
|
||||||
|
}
|
||||||
|
|
||||||
|
public async setTarget(target: TargetState, localSource?: boolean) {
|
||||||
|
// When we get a new target state, clear any built up apply errors
|
||||||
|
// This means that we can attempt to apply the new state instantly
|
||||||
|
if (localSource == null) {
|
||||||
|
localSource = false;
|
||||||
|
}
|
||||||
|
this.failedUpdates = 0;
|
||||||
|
|
||||||
|
validateState(target);
|
||||||
|
const apiEndpoint = await this.config.get('apiEndpoint');
|
||||||
|
|
||||||
|
await this.usingWriteLockTarget(async () => {
|
||||||
|
await this.db.transaction(async trx => {
|
||||||
|
await this.config.set({ name: target.local.name }, trx);
|
||||||
|
await this.deviceConfig.setTarget(target.local.config, trx);
|
||||||
|
|
||||||
|
if (localSource || apiEndpoint == null) {
|
||||||
|
await this.applications.setTarget(
|
||||||
|
target.local.apps,
|
||||||
|
target.dependent,
|
||||||
|
'local',
|
||||||
|
trx,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
await this.applications.setTarget(
|
||||||
|
target.local.apps,
|
||||||
|
target.dependent,
|
||||||
|
apiEndpoint,
|
||||||
|
trx,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public getTarget({
|
||||||
|
initial = false,
|
||||||
|
intermediate = false,
|
||||||
|
}: { initial?: boolean; intermediate?: boolean } = {}): Bluebird<
|
||||||
|
InstancedDeviceState
|
||||||
|
> {
|
||||||
|
return this.usingReadLockTarget(async () => {
|
||||||
|
if (intermediate) {
|
||||||
|
return this.intermediateTarget;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
local: {
|
||||||
|
name: await this.config.get('name'),
|
||||||
|
config: await this.deviceConfig.getTarget({ initial }),
|
||||||
|
apps: await this.applications.getTargetApps(),
|
||||||
|
},
|
||||||
|
dependent: await this.applications.getDependentTargets(),
|
||||||
|
};
|
||||||
|
}) as Bluebird<InstancedDeviceState>;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getStatus(): Promise<DeviceStatus> {
|
||||||
|
const appsStatus = await this.applications.getStatus();
|
||||||
|
const theState: DeepPartial<DeviceStatus> = {
|
||||||
|
local: {},
|
||||||
|
dependent: {},
|
||||||
|
};
|
||||||
|
theState.local = { ...theState.local, ...this.currentVolatile };
|
||||||
|
theState.local.apps = appsStatus.local;
|
||||||
|
theState.dependent!.apps = appsStatus.dependent;
|
||||||
|
if (appsStatus.commit && !this.applyInProgress) {
|
||||||
|
theState.local.is_on__commit = appsStatus.commit;
|
||||||
|
}
|
||||||
|
|
||||||
|
return theState as DeviceStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getCurrentForComparison(): Promise<
|
||||||
|
DeviceStatus & { local: { name: string } }
|
||||||
|
> {
|
||||||
|
const [name, devConfig, apps, dependent] = await Promise.all([
|
||||||
|
this.config.get('name'),
|
||||||
|
this.deviceConfig.getCurrent(),
|
||||||
|
this.applications.getCurrentForComparison(),
|
||||||
|
this.applications.getDependentState(),
|
||||||
|
]);
|
||||||
|
return {
|
||||||
|
local: {
|
||||||
|
name,
|
||||||
|
config: devConfig,
|
||||||
|
apps,
|
||||||
|
},
|
||||||
|
|
||||||
|
dependent,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public reportCurrentState(newState: DeviceReportFields = {}) {
|
||||||
|
if (newState == null) {
|
||||||
|
newState = {};
|
||||||
|
}
|
||||||
|
this.currentVolatile = { ...this.currentVolatile, ...newState };
|
||||||
|
return this.emitAsync('change', undefined);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async reboot(force?: boolean, skipLock?: boolean) {
|
||||||
|
await this.applications.stopAll({ force, skipLock });
|
||||||
|
this.logger.logSystemMessage('Rebooting', {}, 'Reboot');
|
||||||
|
const reboot = await systemd.reboot();
|
||||||
|
this.shuttingDown = true;
|
||||||
|
this.emitAsync('shutdown', undefined);
|
||||||
|
return reboot;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async shutdown(force?: boolean, skipLock?: boolean) {
|
||||||
|
await this.applications.stopAll({ force, skipLock });
|
||||||
|
this.logger.logSystemMessage('Shutting down', {}, 'Shutdown');
|
||||||
|
const shutdown = await systemd.shutdown();
|
||||||
|
this.shuttingDown = true;
|
||||||
|
this.emitAsync('shutdown', undefined);
|
||||||
|
return shutdown;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async executeStepAction<T extends PossibleStepTargets>(
|
||||||
|
step: DeviceStateStep<T>,
|
||||||
|
{
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
skipLock,
|
||||||
|
}: { force?: boolean; initial?: boolean; skipLock?: boolean },
|
||||||
|
) {
|
||||||
|
if (this.deviceConfig.isValidAction(step.action)) {
|
||||||
|
await this.deviceConfig.executeStepAction(step as ConfigStep, {
|
||||||
|
initial,
|
||||||
|
});
|
||||||
|
} else if (_.includes(this.applications.validActions, step.action)) {
|
||||||
|
return this.applications.executeStepAction(step as any, {
|
||||||
|
force,
|
||||||
|
skipLock,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
switch (step.action) {
|
||||||
|
case 'reboot':
|
||||||
|
// There isn't really a way that these methods can fail,
|
||||||
|
// and if they do, we wouldn't know about it until after
|
||||||
|
// the response has been sent back to the API. Just return
|
||||||
|
// "OK" for this and the below action
|
||||||
|
await this.reboot(force, skipLock);
|
||||||
|
return {
|
||||||
|
Data: 'OK',
|
||||||
|
Error: null,
|
||||||
|
};
|
||||||
|
case 'shutdown':
|
||||||
|
await this.shutdown(force, skipLock);
|
||||||
|
return {
|
||||||
|
Data: 'OK',
|
||||||
|
Error: null,
|
||||||
|
};
|
||||||
|
case 'noop':
|
||||||
|
return;
|
||||||
|
default:
|
||||||
|
throw new Error(`Invalid action ${step.action}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async applyStep<T extends PossibleStepTargets>(
|
||||||
|
step: DeviceStateStep<T>,
|
||||||
|
{
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
skipLock,
|
||||||
|
}: {
|
||||||
|
force?: boolean;
|
||||||
|
initial?: boolean;
|
||||||
|
skipLock?: boolean;
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
if (this.shuttingDown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const stepResult = await this.executeStepAction(step, {
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
skipLock,
|
||||||
|
});
|
||||||
|
this.emitAsync('step-completed', null, step, stepResult || undefined);
|
||||||
|
} catch (e) {
|
||||||
|
this.emitAsync('step-error', e, step);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private applyError(
|
||||||
|
err: Error,
|
||||||
|
{
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
intermediate,
|
||||||
|
}: { force?: boolean; initial?: boolean; intermediate?: boolean },
|
||||||
|
) {
|
||||||
|
this.emitAsync('apply-target-state-error', err);
|
||||||
|
this.emitAsync('apply-target-state-end', err);
|
||||||
|
if (intermediate) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
this.failedUpdates += 1;
|
||||||
|
this.reportCurrentState({ update_failed: true });
|
||||||
|
if (this.scheduledApply != null) {
|
||||||
|
if (!(err instanceof UpdatesLockedError)) {
|
||||||
|
log.error(
|
||||||
|
"Updating failed, but there's another update scheduled immediately: ",
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const delay = Math.min(
|
||||||
|
Math.pow(2, this.failedUpdates) * constants.backoffIncrement,
|
||||||
|
this.maxPollTime,
|
||||||
|
);
|
||||||
|
// If there was an error then schedule another attempt briefly in the future.
|
||||||
|
if (err instanceof UpdatesLockedError) {
|
||||||
|
const message = `Updates are locked, retrying in ${prettyMs(delay, {
|
||||||
|
compact: true,
|
||||||
|
})}...`;
|
||||||
|
this.logger.logSystemMessage(message, {}, 'updateLocked', false);
|
||||||
|
log.info(message);
|
||||||
|
} else {
|
||||||
|
log.error(
|
||||||
|
`Scheduling another update attempt in ${delay}ms due to failure: `,
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return this.triggerApplyTarget({ force, delay, initial });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async applyTarget({
|
||||||
|
force = false,
|
||||||
|
initial = false,
|
||||||
|
intermediate = false,
|
||||||
|
skipLock = false,
|
||||||
|
nextDelay = 200,
|
||||||
|
retryCount = 0,
|
||||||
|
} = {}) {
|
||||||
|
if (!intermediate) {
|
||||||
|
await this.applyBlocker;
|
||||||
|
}
|
||||||
|
await this.applications.localModeSwitchCompletion();
|
||||||
|
|
||||||
|
return this.usingInferStepsLock(async () => {
|
||||||
|
const [currentState, targetState] = await Promise.all([
|
||||||
|
this.getCurrentForComparison(),
|
||||||
|
this.getTarget({ initial, intermediate }),
|
||||||
|
]);
|
||||||
|
const extraState = await this.applications.getExtraStateForComparison(
|
||||||
|
currentState,
|
||||||
|
targetState,
|
||||||
|
);
|
||||||
|
const deviceConfigSteps = await this.deviceConfig.getRequiredSteps(
|
||||||
|
currentState,
|
||||||
|
targetState,
|
||||||
|
);
|
||||||
|
const noConfigSteps = _.every(
|
||||||
|
deviceConfigSteps,
|
||||||
|
({ action }) => action === 'noop',
|
||||||
|
);
|
||||||
|
|
||||||
|
let backoff: boolean;
|
||||||
|
let steps: Array<DeviceStateStep<PossibleStepTargets>>;
|
||||||
|
|
||||||
|
if (!noConfigSteps) {
|
||||||
|
backoff = false;
|
||||||
|
steps = deviceConfigSteps;
|
||||||
|
} else {
|
||||||
|
const appSteps = await this.applications.getRequiredSteps(
|
||||||
|
currentState,
|
||||||
|
targetState,
|
||||||
|
extraState,
|
||||||
|
intermediate,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (_.isEmpty(appSteps)) {
|
||||||
|
// If we retrieve a bunch of no-ops from the
|
||||||
|
// device config, generally we want to back off
|
||||||
|
// more than if we retrieve them from the
|
||||||
|
// application manager
|
||||||
|
backoff = true;
|
||||||
|
steps = deviceConfigSteps;
|
||||||
|
} else {
|
||||||
|
backoff = false;
|
||||||
|
steps = appSteps;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_.isEmpty(steps)) {
|
||||||
|
this.emitAsync('apply-target-state-end', null);
|
||||||
|
if (!intermediate) {
|
||||||
|
log.debug('Finished applying target state');
|
||||||
|
this.applications.timeSpentFetching = 0;
|
||||||
|
this.failedUpdates = 0;
|
||||||
|
this.lastSuccessfulUpdate = Date.now();
|
||||||
|
this.reportCurrentState({
|
||||||
|
update_failed: false,
|
||||||
|
update_pending: false,
|
||||||
|
update_downloaded: false,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!intermediate) {
|
||||||
|
this.reportCurrentState({ update_pending: true });
|
||||||
|
}
|
||||||
|
if (_.every(steps, step => step.action === 'noop')) {
|
||||||
|
if (backoff) {
|
||||||
|
retryCount += 1;
|
||||||
|
// Backoff to a maximum of 10 minutes
|
||||||
|
nextDelay = Math.min(Math.pow(2, retryCount) * 1000, 60 * 10 * 1000);
|
||||||
|
} else {
|
||||||
|
nextDelay = 1000;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await Promise.all(
|
||||||
|
steps.map(s => this.applyStep(s, { force, initial, skipLock })),
|
||||||
|
);
|
||||||
|
|
||||||
|
await Bluebird.delay(nextDelay);
|
||||||
|
await this.applyTarget({
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
intermediate,
|
||||||
|
skipLock,
|
||||||
|
nextDelay,
|
||||||
|
retryCount,
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
const detailedError = new Error(
|
||||||
|
'Failed to apply state transition steps. ' +
|
||||||
|
e.message +
|
||||||
|
' Steps:' +
|
||||||
|
JSON.stringify(_.map(steps, 'action')),
|
||||||
|
);
|
||||||
|
return this.applyError(detailedError, {
|
||||||
|
force,
|
||||||
|
initial,
|
||||||
|
intermediate,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}).catch(err => {
|
||||||
|
return this.applyError(err, { force, initial, intermediate });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public pausingApply(fn: () => any) {
|
||||||
|
const lock = () => {
|
||||||
|
return this.writeLock('pause').disposer(release => release());
|
||||||
|
};
|
||||||
|
// TODO: This function is a bit of a mess
|
||||||
|
const pause = () => {
|
||||||
|
return Bluebird.try(() => {
|
||||||
|
let res = null;
|
||||||
|
this.applyBlocker = new Promise(resolve => {
|
||||||
|
res = resolve;
|
||||||
|
});
|
||||||
|
return res;
|
||||||
|
}).disposer((resolve: any) => resolve());
|
||||||
|
};
|
||||||
|
|
||||||
|
return Bluebird.using(lock(), () => Bluebird.using(pause(), () => fn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public triggerApplyTarget({
|
||||||
|
force = false,
|
||||||
|
delay = 0,
|
||||||
|
initial = false,
|
||||||
|
isFromApi = false,
|
||||||
|
} = {}) {
|
||||||
|
if (this.applyInProgress) {
|
||||||
|
if (this.scheduledApply == null || (isFromApi && this.cancelDelay)) {
|
||||||
|
this.scheduledApply = { force, delay };
|
||||||
|
if (isFromApi) {
|
||||||
|
// Cancel promise delay if call came from api to
|
||||||
|
// prevent waiting due to backoff (and if we've
|
||||||
|
// previously setup a delay)
|
||||||
|
this.cancelDelay?.();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If a delay has been set it's because we need to hold off before applying again,
|
||||||
|
// so we need to respect the maximum delay that has
|
||||||
|
// been passed
|
||||||
|
if (!this.scheduledApply.delay) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
'No delay specified in scheduledApply',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.scheduledApply.delay = Math.max(delay, this.scheduledApply.delay);
|
||||||
|
if (!this.scheduledApply.force) {
|
||||||
|
this.scheduledApply.force = force;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.applyCancelled = false;
|
||||||
|
this.applyInProgress = true;
|
||||||
|
new Bluebird((resolve, reject) => {
|
||||||
|
setTimeout(resolve, delay);
|
||||||
|
this.cancelDelay = reject;
|
||||||
|
})
|
||||||
|
.catch(() => {
|
||||||
|
this.applyCancelled = true;
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
this.cancelDelay = null;
|
||||||
|
if (this.applyCancelled) {
|
||||||
|
log.info('Skipping applyTarget because of a cancellation');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.lastApplyStart = process.hrtime();
|
||||||
|
log.info('Applying target state');
|
||||||
|
return this.applyTarget({ force, initial });
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
this.applyInProgress = false;
|
||||||
|
this.reportCurrentState();
|
||||||
|
if (this.scheduledApply != null) {
|
||||||
|
this.triggerApplyTarget(this.scheduledApply);
|
||||||
|
this.scheduledApply = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public applyIntermediateTarget(
|
||||||
|
intermediateTarget: TargetState,
|
||||||
|
{ force = false, skipLock = false } = {},
|
||||||
|
) {
|
||||||
|
this.intermediateTarget = _.cloneDeep(intermediateTarget);
|
||||||
|
return this.applyTarget({ intermediate: true, force, skipLock }).then(
|
||||||
|
() => {
|
||||||
|
this.intermediateTarget = null;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default DeviceState;
|
@ -2,7 +2,7 @@ import * as _ from 'lodash';
|
|||||||
import { fs } from 'mz';
|
import { fs } from 'mz';
|
||||||
|
|
||||||
import { Image } from '../compose/images';
|
import { Image } from '../compose/images';
|
||||||
import DeviceState = require('../device-state');
|
import DeviceState from '../device-state';
|
||||||
|
|
||||||
import constants = require('../lib/constants');
|
import constants = require('../lib/constants');
|
||||||
import { AppsJsonParseError, EISDIR, ENOENT } from '../lib/errors';
|
import { AppsJsonParseError, EISDIR, ENOENT } from '../lib/errors';
|
||||||
|
@ -9,10 +9,10 @@ import * as rimraf from 'rimraf';
|
|||||||
const mkdirpAsync = Bluebird.promisify(mkdirp);
|
const mkdirpAsync = Bluebird.promisify(mkdirp);
|
||||||
const rimrafAsync = Bluebird.promisify(rimraf);
|
const rimrafAsync = Bluebird.promisify(rimraf);
|
||||||
|
|
||||||
import ApplicationManager from '../application-manager';
|
import ApplicationManager = require('../application-manager');
|
||||||
import Config from '../config';
|
import Config from '../config';
|
||||||
import Database, { Transaction } from '../db';
|
import Database, { Transaction } from '../db';
|
||||||
import DeviceState = require('../device-state');
|
import DeviceState from '../device-state';
|
||||||
import * as constants from '../lib/constants';
|
import * as constants from '../lib/constants';
|
||||||
import { BackupError, DatabaseParseError, NotFoundError } from '../lib/errors';
|
import { BackupError, DatabaseParseError, NotFoundError } from '../lib/errors';
|
||||||
import { pathExistsOnHost } from '../lib/fs-utils';
|
import { pathExistsOnHost } from '../lib/fs-utils';
|
||||||
|
@ -1,13 +1,12 @@
|
|||||||
import APIBinder from './api-binder';
|
import APIBinder from './api-binder';
|
||||||
import Config, { ConfigKey } from './config';
|
import Config, { ConfigKey } from './config';
|
||||||
import Database from './db';
|
import Database from './db';
|
||||||
|
import DeviceState from './device-state';
|
||||||
import EventTracker from './event-tracker';
|
import EventTracker from './event-tracker';
|
||||||
import { normaliseLegacyDatabase } from './lib/migration';
|
import { normaliseLegacyDatabase } from './lib/migration';
|
||||||
import Logger from './logger';
|
import Logger from './logger';
|
||||||
import SupervisorAPI from './supervisor-api';
|
import SupervisorAPI from './supervisor-api';
|
||||||
|
|
||||||
import DeviceState = require('./device-state');
|
|
||||||
|
|
||||||
import constants = require('./lib/constants');
|
import constants = require('./lib/constants');
|
||||||
import log from './lib/supervisor-console';
|
import log from './lib/supervisor-console';
|
||||||
import version = require('./lib/supervisor-version');
|
import version = require('./lib/supervisor-version');
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
|
|
||||||
import ApplicationManager from './application-manager';
|
import ApplicationManager = require('./application-manager');
|
||||||
import Config from './config';
|
import Config from './config';
|
||||||
import Database, { Transaction } from './db';
|
import Database, { Transaction } from './db';
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ const { expect } = chai;
|
|||||||
import Config from '../src/config';
|
import Config from '../src/config';
|
||||||
import { RPiConfigBackend } from '../src/config/backend';
|
import { RPiConfigBackend } from '../src/config/backend';
|
||||||
import DB from '../src/db';
|
import DB from '../src/db';
|
||||||
import DeviceState = require('../src/device-state');
|
import DeviceState from '../src/device-state';
|
||||||
|
|
||||||
import { loadTargetFromFile } from '../src/device-state/preload';
|
import { loadTargetFromFile } from '../src/device-state/preload';
|
||||||
|
|
||||||
@ -112,6 +112,8 @@ const testTarget2 = {
|
|||||||
labels: {},
|
labels: {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
volumes: {},
|
||||||
|
networks: {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -310,7 +312,7 @@ describe('deviceState', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('emits a change event when a new state is reported', () => {
|
it('emits a change event when a new state is reported', () => {
|
||||||
deviceState.reportCurrentState({ someStateDiff: 'someValue' });
|
deviceState.reportCurrentState({ someStateDiff: 'someValue' } as any);
|
||||||
return (expect as any)(deviceState).to.emit('change');
|
return (expect as any)(deviceState).to.emit('change');
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -339,7 +341,7 @@ describe('deviceState', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('does not allow setting an invalid target state', () => {
|
it('does not allow setting an invalid target state', () => {
|
||||||
expect(deviceState.setTarget(testTargetInvalid)).to.be.rejected;
|
expect(deviceState.setTarget(testTargetInvalid as any)).to.be.rejected;
|
||||||
});
|
});
|
||||||
|
|
||||||
it('allows triggering applying the target state', done => {
|
it('allows triggering applying the target state', done => {
|
||||||
|
@ -11,7 +11,7 @@ const { expect } = chai;
|
|||||||
import ApiBinder from '../src/api-binder';
|
import ApiBinder from '../src/api-binder';
|
||||||
import Config from '../src/config';
|
import Config from '../src/config';
|
||||||
import DB from '../src/db';
|
import DB from '../src/db';
|
||||||
import DeviceState = require('../src/device-state');
|
import DeviceState from '../src/device-state';
|
||||||
|
|
||||||
const initModels = async (obj: Dictionary<any>, filename: string) => {
|
const initModels = async (obj: Dictionary<any>, filename: string) => {
|
||||||
prepare();
|
prepare();
|
||||||
|
@ -7,7 +7,7 @@ chai.use(require('chai-events'))
|
|||||||
{ expect } = chai
|
{ expect } = chai
|
||||||
|
|
||||||
prepare = require './lib/prepare'
|
prepare = require './lib/prepare'
|
||||||
DeviceState = require '../src/device-state'
|
{ DeviceState } = require '../src/device-state'
|
||||||
{ DB } = require('../src/db')
|
{ DB } = require('../src/db')
|
||||||
{ Config } = require('../src/config')
|
{ Config } = require('../src/config')
|
||||||
{ Service } = require '../src/compose/service'
|
{ Service } = require '../src/compose/service'
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
import { APIBinder, APIBinderConstructOpts } from '../src/api-binder';
|
import { APIBinder, APIBinderConstructOpts } from '../src/api-binder';
|
||||||
import { DeviceApplicationState } from '../src/types/state';
|
import { DeviceStatus } from '../src/types/state';
|
||||||
|
|
||||||
describe('APIBinder', () => {
|
describe('APIBinder', () => {
|
||||||
let apiBinder: APIBinder;
|
let apiBinder: APIBinder;
|
||||||
@ -27,7 +27,7 @@ describe('APIBinder', () => {
|
|||||||
is_on__commit: 'whatever',
|
is_on__commit: 'whatever',
|
||||||
},
|
},
|
||||||
dependent: { apps: {} },
|
dependent: { apps: {} },
|
||||||
} as DeviceApplicationState;
|
} as DeviceStatus;
|
||||||
|
|
||||||
it('should strip applications data', () => {
|
it('should strip applications data', () => {
|
||||||
const result = apiBinder.stripDeviceStateInLocalMode(
|
const result = apiBinder.stripDeviceStateInLocalMode(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user