mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-01-01 19:46:44 +00:00
Implement v2 API endpoints to restart and purge apps, and restart a service
This also changes the deviceState object to use promises instead of timeouts to schedule applying the target state. Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
parent
f653fa4961
commit
5ec8e57aa0
@ -44,11 +44,11 @@ fetchAction = (service) ->
|
||||
image: imageForService(service)
|
||||
serviceId: service.serviceId
|
||||
}
|
||||
# TODO: implement v2 endpoints
|
||||
# TODO: implement additional v2 endpoints
|
||||
# v1 endpoins only work for single-container apps as they assume the app has a single service.
|
||||
class ApplicationManagerRouter
|
||||
constructor: (@applications) ->
|
||||
{ @proxyvisor, @eventTracker } = @applications
|
||||
{ @proxyvisor, @eventTracker, @deviceState, @_lockingIfNecessary } = @applications
|
||||
@router = express.Router()
|
||||
@router.use(bodyParser.urlencoded(extended: true))
|
||||
@router.use(bodyParser.json())
|
||||
@ -165,10 +165,71 @@ class ApplicationManagerRouter
|
||||
res.status(200).json(Data: 'OK', Error: '')
|
||||
.catch (err) ->
|
||||
res.status(503).send(err?.message or err or 'Unknown error')
|
||||
|
||||
@router.post '/v2/applications/:appId/purge', (req, res) =>
|
||||
{ force } = req.body
|
||||
{ appId } = req.params
|
||||
@_lockingIfNecessary appId, { force }, =>
|
||||
@deviceState.getCurrentForComparison()
|
||||
.then (currentState) =>
|
||||
app = currentState.local.apps[appId]
|
||||
purgedApp = _.cloneDeep(app)
|
||||
purgedApp.services = []
|
||||
purgedApp.volumes = {}
|
||||
currentState.local.apps[appId] = purgedApp
|
||||
@deviceState.applyIntermediateTarget(currentState, { skipLock: true })
|
||||
.then =>
|
||||
currentState.local.apps[appId] = app
|
||||
@deviceState.applyIntermediateTarget(currentState, { skipLock: true })
|
||||
.then ->
|
||||
res.status(200).send('OK')
|
||||
.catch (err) ->
|
||||
res.status(503).send(err?.message or err or 'Unknown error')
|
||||
|
||||
@router.post '/v2/applications/:appId/restart-service', (req, res) =>
|
||||
{ imageId, force } = req.body
|
||||
{ appId } = req.params
|
||||
@_lockingIfNecessary appId, { force }, =>
|
||||
@applications.getCurrentApp(appId)
|
||||
.then (app) =>
|
||||
if !app?
|
||||
errMsg = "App not found: an app needs to be installed for purge to work.
|
||||
If you've recently moved this device from another app,
|
||||
please push an app and wait for it to be installed first."
|
||||
return res.status(404).send(errMsg)
|
||||
service = _.find(app.services, (s) -> s.imageId == imageId)
|
||||
if !service?
|
||||
errMsg = 'Service not found, a container must exist for service restart to work.'
|
||||
return res.status(404).send(errMsg)
|
||||
@applications.executeStepAction(serviceAction('restart', service.serviceId, service, service), { skipLock: true })
|
||||
.then ->
|
||||
res.status(200).send('OK')
|
||||
.catch (err) ->
|
||||
res.status(503).send(err?.message or err or 'Unknown error')
|
||||
|
||||
@router.post '/v2/applications/:appId/restart', (req, res) =>
|
||||
{ force } = req.body
|
||||
{ appId } = req.params
|
||||
@_lockingIfNecessary appId, { force }, =>
|
||||
@deviceState.getCurrentForComparison()
|
||||
.then (currentState) =>
|
||||
app = currentState.local.apps[appId]
|
||||
stoppedApp = _.cloneDeep(app)
|
||||
stoppedApp.services = []
|
||||
currentState.local.apps[appId] = stoppedApp
|
||||
@deviceState.applyIntermediateTarget(currentState, { skipLock: true })
|
||||
.then =>
|
||||
currentState.local.apps[appId] = app
|
||||
@deviceState.applyIntermediateTarget(currentState, { skipLock: true })
|
||||
.then ->
|
||||
res.status(200).send('OK')
|
||||
.catch (err) ->
|
||||
res.status(503).send(err?.message or err or 'Unknown error')
|
||||
|
||||
@router.use(@proxyvisor.router)
|
||||
|
||||
module.exports = class ApplicationManager extends EventEmitter
|
||||
constructor: ({ @logger, @config, @db, @eventTracker }) ->
|
||||
constructor: ({ @logger, @config, @db, @eventTracker, @deviceState }) ->
|
||||
@docker = new Docker()
|
||||
@images = new Images({ @docker, @logger, @db })
|
||||
@services = new ServiceManager({ @docker, @logger, @images, @config })
|
||||
@ -179,21 +240,21 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@fetchesInProgress = 0
|
||||
@_targetVolatilePerServiceId = {}
|
||||
@actionExecutors = {
|
||||
stop: (step, { force = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, =>
|
||||
stop: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@services.kill(step.current, { removeContainer: false })
|
||||
kill: (step, { force = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, =>
|
||||
kill: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@services.kill(step.current)
|
||||
.then =>
|
||||
if step.options?.removeImage
|
||||
@images.removeByDockerId(step.current.image)
|
||||
updateMetadata: (step) =>
|
||||
@services.updateMetadata(step.current, step.target)
|
||||
purge: (step, { force = false } = {}) =>
|
||||
purge: (step, { force = false, skipLock = false } = {}) =>
|
||||
appId = step.appId
|
||||
@logger.logSystemMessage("Purging data for app #{appId}", { appId }, 'Purge data')
|
||||
@_lockingIfNecessary appId, { force, skipLock: step.options?.skipLock }, =>
|
||||
@_lockingIfNecessary appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@getCurrentApp(appId)
|
||||
.then (app) =>
|
||||
if !_.isEmpty(app?.services)
|
||||
@ -210,18 +271,18 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
.catch (err) =>
|
||||
@logger.logSystemMessage("Error purging data: #{err}", { appId, error: err }, 'Purge data error')
|
||||
throw err
|
||||
restart: (step, { force = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, =>
|
||||
restart: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
Promise.try =>
|
||||
@services.kill(step.current)
|
||||
.then =>
|
||||
@services.start(step.target)
|
||||
stopAll: (step, { force = false } = {}) =>
|
||||
@stopAll({ force })
|
||||
stopAll: (step, { force = false, skipLock = false } = {}) =>
|
||||
@stopAll({ force, skipLock })
|
||||
start: (step) =>
|
||||
@services.start(step.target)
|
||||
handover: (step, { force = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, =>
|
||||
handover: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@services.handover(step.current, step.target)
|
||||
fetch: (step) =>
|
||||
startTime = process.hrtime()
|
||||
@ -327,29 +388,26 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@proxyvisor.getCurrentStates()
|
||||
|
||||
_buildApps: (services, networks, volumes) ->
|
||||
apps = _.keyBy(_.map(_.uniq(_.map(services, 'appId')), (appId) -> { appId }), 'appId')
|
||||
apps = {}
|
||||
|
||||
# We iterate over the current running services and add them to the current state
|
||||
# of the app they belong to.
|
||||
for service in services
|
||||
appId = service.appId
|
||||
apps[appId].services ?= []
|
||||
apps[appId] ?= { appId, services: [], volumes: {}, networks: {} }
|
||||
apps[appId].services.push(service)
|
||||
|
||||
for network in networks
|
||||
appId = network.appId
|
||||
apps[appId] ?= { appId }
|
||||
apps[appId].networks ?= {}
|
||||
apps[appId] ?= { appId, services: [], volumes: {}, networks: {} }
|
||||
apps[appId].networks[network.name] = network.config
|
||||
|
||||
for volume in volumes
|
||||
appId = volume.appId
|
||||
apps[appId] ?= { appId }
|
||||
apps[appId].volumes ?= {}
|
||||
apps[appId] ?= { appId, services: [], volumes: {}, networks: {} }
|
||||
apps[appId].volumes[volume.name] = volume.config
|
||||
|
||||
# We return the apps as an array
|
||||
return _.values(apps)
|
||||
return apps
|
||||
|
||||
getCurrentForComparison: =>
|
||||
Promise.join(
|
||||
@ -366,7 +424,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@networks.getAllByAppId(appId)
|
||||
@volumes.getAllByAppId(appId)
|
||||
(services, networks, volumes) =>
|
||||
return @_buildApps(services, networks, volumes)[0]
|
||||
return @_buildApps(services, networks, volumes)[appId]
|
||||
)
|
||||
|
||||
getTargetApp: (appId) =>
|
||||
@ -621,7 +679,8 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
# There is already a step in progress for this service, so we wait
|
||||
return null
|
||||
|
||||
needsDownload = !_.some(availableImages, (image) => @images.isSameImage(image, { name: target.imageName }))
|
||||
needsDownload = !_.some availableImages, (image) =>
|
||||
image.dockerImageId == target.image or @images.isSameImage(image, { name: target.imageName })
|
||||
dependenciesMetForStart = =>
|
||||
@_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs), stepsInProgress)
|
||||
dependenciesMetForKill = =>
|
||||
@ -788,6 +847,8 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
_.merge(service, @_targetVolatilePerServiceId[service.serviceId])
|
||||
return service
|
||||
return app
|
||||
.then (apps) ->
|
||||
return _.keyBy(apps, 'appId')
|
||||
|
||||
getDependentTargets: =>
|
||||
@proxyvisor.getTarget()
|
||||
@ -840,15 +901,15 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return notUsedForDelta and notUsedByProxyvisor
|
||||
return { imagesToSave, imagesToRemove }
|
||||
|
||||
_inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress) =>
|
||||
_inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress, ignoreImages) =>
|
||||
Promise.try =>
|
||||
currentByAppId = _.keyBy(current.local.apps ? [], 'appId')
|
||||
targetByAppId = _.keyBy(target.local.apps ? [], 'appId')
|
||||
currentByAppId = current.local.apps ? {}
|
||||
targetByAppId = target.local.apps ? {}
|
||||
nextSteps = []
|
||||
if !supervisorNetworkReady
|
||||
nextSteps.push({ action: 'ensureSupervisorNetwork' })
|
||||
else
|
||||
if !_.some(stepsInProgress, (step) -> step.action == 'fetch')
|
||||
if !ignoreImages and !_.some(stepsInProgress, (step) -> step.action == 'fetch')
|
||||
if cleanupNeeded
|
||||
nextSteps.push({ action: 'cleanup' })
|
||||
{ imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages)
|
||||
@ -869,36 +930,38 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
!_.find(stepsInProgress, (s) -> _.isEqual(s, step))?
|
||||
_.uniqWith(withoutProgressDups, _.isEqual)
|
||||
|
||||
stopAll: ({ force = false } = {}) =>
|
||||
stopAll: ({ force = false, skipLock = false } = {}) =>
|
||||
@services.getAll()
|
||||
.map (service) =>
|
||||
@_lockingIfNecessary service.appId, { force }, =>
|
||||
@_lockingIfNecessary service.appId, { force, skipLock }, =>
|
||||
@services.kill(service, { removeContainer: false })
|
||||
|
||||
_lockingIfNecessary: (appId, { force = false, skipLock = false } = {}, fn) =>
|
||||
if skipLock
|
||||
return Promise.resolve()
|
||||
return Promise.try( -> fn())
|
||||
@config.get('lockOverride')
|
||||
.then (lockOverride) ->
|
||||
return checkTruthy(lockOverride) or force
|
||||
.then (force) ->
|
||||
updateLock.lock(appId, { force }, fn)
|
||||
|
||||
executeStepAction: (step, { force = false } = {}) =>
|
||||
executeStepAction: (step, { force = false, skipLock = false } = {}) =>
|
||||
if _.includes(@proxyvisor.validActions, step.action)
|
||||
return @proxyvisor.executeStepAction(step)
|
||||
if !_.includes(@validActions, step.action)
|
||||
return Promise.reject(new Error("Invalid action #{step.action}"))
|
||||
@actionExecutors[step.action](step, { force })
|
||||
@actionExecutors[step.action](step, { force, skipLock })
|
||||
|
||||
getRequiredSteps: (currentState, targetState, stepsInProgress) =>
|
||||
getRequiredSteps: (currentState, targetState, stepsInProgress, ignoreImages = false) =>
|
||||
Promise.join(
|
||||
@images.isCleanupNeeded()
|
||||
@images.getAvailable()
|
||||
@networks.supervisorNetworkReady()
|
||||
(cleanupNeeded, availableImages, supervisorNetworkReady) =>
|
||||
@_inferNextSteps(cleanupNeeded, availableImages, supervisorNetworkReady, currentState, targetState, stepsInProgress)
|
||||
@_inferNextSteps(cleanupNeeded, availableImages, supervisorNetworkReady, currentState, targetState, stepsInProgress, ignoreImages)
|
||||
.then (nextSteps) =>
|
||||
if ignoreImages and _.some(nextSteps, (step) -> step.action == 'fetch')
|
||||
throw new Error('Cannot fetch images while executing an API action')
|
||||
@proxyvisor.getRequiredSteps(availableImages, currentState, targetState, nextSteps.concat(stepsInProgress))
|
||||
.then (proxyvisorSteps) ->
|
||||
return nextSteps.concat(proxyvisorSteps)
|
||||
|
@ -155,7 +155,7 @@ module.exports = class DeviceState extends EventEmitter
|
||||
constructor: ({ @db, @config, @eventTracker }) ->
|
||||
@logger = new Logger({ @eventTracker })
|
||||
@deviceConfig = new DeviceConfig({ @db, @config, @logger })
|
||||
@applications = new ApplicationManager({ @config, @logger, @db, @eventTracker })
|
||||
@applications = new ApplicationManager({ @config, @logger, @db, @eventTracker, deviceState: this })
|
||||
@on 'error', (err) ->
|
||||
console.error('Error in deviceState: ', err, err.stack)
|
||||
@_currentVolatile = {}
|
||||
@ -164,11 +164,9 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@_readLock = Promise.promisify(_lock.async.readLock)
|
||||
@lastSuccessfulUpdate = null
|
||||
@failedUpdates = 0
|
||||
@stepsInProgress = []
|
||||
@applyInProgress = false
|
||||
@lastApplyStart = process.hrtime()
|
||||
@scheduledApply = null
|
||||
@applyContinueScheduled = false
|
||||
@shuttingDown = false
|
||||
@_router = new DeviceStateRouter(this)
|
||||
@router = @_router.router
|
||||
@ -309,8 +307,10 @@ module.exports = class DeviceState extends EventEmitter
|
||||
.then =>
|
||||
@applications.setTarget(target.local.apps, target.dependent, trx)
|
||||
|
||||
getTarget: ({ initial = false } = {}) =>
|
||||
getTarget: ({ initial = false, intermediate = false } = {}) =>
|
||||
@usingReadLockTarget =>
|
||||
if intermediate
|
||||
return @intermediateTarget
|
||||
Promise.props({
|
||||
local: Promise.props({
|
||||
name: @config.get('name')
|
||||
@ -394,16 +394,16 @@ module.exports = class DeviceState extends EventEmitter
|
||||
.catch (err) =>
|
||||
@eventTracker.track('Loading preloaded apps failed', { error: err })
|
||||
|
||||
reboot: (force) =>
|
||||
@applications.stopAll({ force })
|
||||
reboot: (force, skipLock) =>
|
||||
@applications.stopAll({ force, skipLock })
|
||||
.then =>
|
||||
@logger.logSystemMessage('Rebooting', {}, 'Reboot')
|
||||
device.reboot()
|
||||
.tap =>
|
||||
@emit('shutdown')
|
||||
|
||||
shutdown: (force) =>
|
||||
@applications.stopAll({ force })
|
||||
shutdown: (force, skipLock) =>
|
||||
@applications.stopAll({ force, skipLock })
|
||||
.then =>
|
||||
@logger.logSystemMessage('Shutting down', {}, 'Shutdown')
|
||||
device.shutdown()
|
||||
@ -411,70 +411,80 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@shuttingDown = true
|
||||
@emitAsync('shutdown')
|
||||
|
||||
executeStepAction: (step, { force, initial }) =>
|
||||
executeStepAction: (step, { force, initial, skipLock }) =>
|
||||
Promise.try =>
|
||||
if _.includes(@deviceConfig.validActions, step.action)
|
||||
@deviceConfig.executeStepAction(step, { initial })
|
||||
else if _.includes(@applications.validActions, step.action)
|
||||
@applications.executeStepAction(step, { force })
|
||||
@applications.executeStepAction(step, { force, skipLock })
|
||||
else
|
||||
switch step.action
|
||||
when 'reboot'
|
||||
@reboot(force)
|
||||
@reboot(force, skipLock)
|
||||
when 'shutdown'
|
||||
@shutdown(force)
|
||||
@shutdown(force, skipLock)
|
||||
when 'noop'
|
||||
Promise.resolve()
|
||||
else
|
||||
throw new Error("Invalid action #{step.action}")
|
||||
|
||||
applyStepAsync: (step, { force, initial }) =>
|
||||
applyStep: (step, { force, initial, intermediate, skipLock }, updateContext) =>
|
||||
if @shuttingDown
|
||||
return
|
||||
@stepsInProgress.push(step)
|
||||
setImmediate =>
|
||||
@executeStepAction(step, { force, initial })
|
||||
.finally =>
|
||||
@usingInferStepsLock =>
|
||||
_.pullAllWith(@stepsInProgress, [ step ], _.isEqual)
|
||||
.then (stepResult) =>
|
||||
@emitAsync('step-completed', null, step, stepResult)
|
||||
@continueApplyTarget({ force, initial })
|
||||
.catch (err) =>
|
||||
@emitAsync('step-error', err, step)
|
||||
@applyError(err, force, initial)
|
||||
updateContext.stepsInProgress.push(step)
|
||||
@executeStepAction(step, { force, initial, skipLock })
|
||||
.finally =>
|
||||
@usingInferStepsLock ->
|
||||
_.pullAllWith(updateContext.stepsInProgress, [ step ], _.isEqual)
|
||||
.catch (err) =>
|
||||
@emitAsync('step-error', err, step)
|
||||
throw err
|
||||
.then (stepResult) =>
|
||||
@emitAsync('step-completed', null, step, stepResult)
|
||||
@continueApplyTarget({ force, initial, intermediate }, updateContext)
|
||||
|
||||
applyError: (err, force, initial) =>
|
||||
@_applyingSteps = false
|
||||
@applyInProgress = false
|
||||
@failedUpdates += 1
|
||||
@reportCurrentState(update_failed: true)
|
||||
if @scheduledApply?
|
||||
console.log('Updating failed, but there is already another update scheduled immediately: ', err)
|
||||
else
|
||||
delay = Math.min((2 ** @failedUpdates) * 500, 30000)
|
||||
# If there was an error then schedule another attempt briefly in the future.
|
||||
console.log('Scheduling another update attempt due to failure: ', delay, err)
|
||||
@triggerApplyTarget({ force, delay, initial })
|
||||
applyError: (err, force, { initial, intermediate }) =>
|
||||
if !intermediate
|
||||
@applyInProgress = false
|
||||
@failedUpdates += 1
|
||||
@reportCurrentState(update_failed: true)
|
||||
if @scheduledApply?
|
||||
console.log('Updating failed, but there is already another update scheduled immediately: ', err)
|
||||
else
|
||||
delay = Math.min((2 ** @failedUpdates) * 500, 30000)
|
||||
# If there was an error then schedule another attempt briefly in the future.
|
||||
console.log('Scheduling another update attempt due to failure: ', delay, err)
|
||||
@triggerApplyTarget({ force, delay, initial })
|
||||
@emitAsync('apply-target-state-error', err)
|
||||
@emitAsync('apply-target-state-end', err)
|
||||
throw err
|
||||
|
||||
applyTarget: ({ force = false, initial = false } = {}) =>
|
||||
console.log('Applying target state')
|
||||
@usingInferStepsLock =>
|
||||
Promise.join(
|
||||
@getCurrentForComparison()
|
||||
@getTarget({ initial })
|
||||
(currentState, targetState) =>
|
||||
@deviceConfig.getRequiredSteps(currentState, targetState, @stepsInProgress)
|
||||
.then (deviceConfigSteps) =>
|
||||
if !_.isEmpty(deviceConfigSteps)
|
||||
return deviceConfigSteps
|
||||
else
|
||||
@applications.getRequiredSteps(currentState, targetState, @stepsInProgress)
|
||||
)
|
||||
.then (steps) =>
|
||||
if _.isEmpty(steps) and _.isEmpty(@stepsInProgress)
|
||||
applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false } = {}, updateContext) =>
|
||||
if !updateContext?
|
||||
updateContext = {
|
||||
stepsInProgress: []
|
||||
applyContinueScheduled: false
|
||||
}
|
||||
Promise.try =>
|
||||
if !intermediate
|
||||
@applyBlocker
|
||||
.then =>
|
||||
console.log('Applying target state')
|
||||
@usingInferStepsLock =>
|
||||
Promise.join(
|
||||
@getCurrentForComparison()
|
||||
@getTarget({ initial, intermediate })
|
||||
(currentState, targetState) =>
|
||||
@deviceConfig.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress)
|
||||
.then (deviceConfigSteps) =>
|
||||
if !_.isEmpty(deviceConfigSteps)
|
||||
return deviceConfigSteps
|
||||
else
|
||||
@applications.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress, intermediate)
|
||||
)
|
||||
.then (steps) =>
|
||||
if _.isEmpty(steps) and _.isEmpty(updateContext.stepsInProgress)
|
||||
if !intermediate
|
||||
console.log('Finished applying target state')
|
||||
@applyInProgress = false
|
||||
@applications.timeSpentFetching = 0
|
||||
@ -482,31 +492,38 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@lastSuccessfulUpdate = Date.now()
|
||||
@reportCurrentState(update_failed: false, update_pending: false, update_downloaded: false)
|
||||
@emitAsync('apply-target-state-end', null)
|
||||
return
|
||||
return
|
||||
if !intermediate
|
||||
@reportCurrentState(update_pending: true)
|
||||
Promise.map steps, (step) =>
|
||||
@applyStepAsync(step, { force, initial })
|
||||
Promise.map steps, (step) =>
|
||||
@applyStep(step, { force, initial, intermediate, skipLock }, updateContext)
|
||||
.catch (err) =>
|
||||
@applyError(err, force, initial)
|
||||
@applyError(err, force, { initial, intermediate })
|
||||
|
||||
continueApplyTarget: ({ force = false, initial = false } = {}) =>
|
||||
if @applyContinueScheduled
|
||||
return
|
||||
@applyContinueScheduled = true
|
||||
setTimeout( =>
|
||||
@applyContinueScheduled = false
|
||||
@applyTarget({ force, initial })
|
||||
, 1000)
|
||||
return
|
||||
continueApplyTarget: ({ force = false, initial = false, intermediate = false } = {}, updateContext) =>
|
||||
Promise.try =>
|
||||
if !intermediate
|
||||
@applyBlocker
|
||||
.then =>
|
||||
if updateContext.applyContinueScheduled
|
||||
return
|
||||
updateContext.applyContinueScheduled = true
|
||||
Promise.delay(1000)
|
||||
.then =>
|
||||
updateContext.applyContinueScheduled = false
|
||||
@applyTarget({ force, initial, intermediate }, updateContext)
|
||||
|
||||
pauseNextApply: =>
|
||||
@applyBlocker = new Promise (resolve) =>
|
||||
@applyUnblocker = resolve
|
||||
|
||||
resumeNextApply: =>
|
||||
@applyUnblocker?()
|
||||
|
||||
# TODO: Make this and applyTarget work purely with promises, no need to wait on events
|
||||
triggerApplyTarget: ({ force = false, delay = 0, initial = false } = {}) =>
|
||||
if @applyInProgress
|
||||
if !@scheduledApply?
|
||||
@scheduledApply = { force, delay }
|
||||
@once 'apply-target-state-end', =>
|
||||
@triggerApplyTarget(@scheduledApply)
|
||||
@scheduledApply = null
|
||||
else
|
||||
# If a delay has been set it's because we need to hold off before applying again,
|
||||
# so we need to respect the maximum delay that has been passed
|
||||
@ -514,8 +531,18 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@scheduledApply.force or= force
|
||||
return
|
||||
@applyInProgress = true
|
||||
setTimeout( =>
|
||||
Promise.delay(delay)
|
||||
.then =>
|
||||
@lastApplyStart = process.hrtime()
|
||||
@applyTarget({ force, initial })
|
||||
, delay)
|
||||
.finally =>
|
||||
if @scheduledApply?
|
||||
@triggerApplyTarget(@scheduledApply)
|
||||
@scheduledApply = null
|
||||
return
|
||||
|
||||
applyIntermediateTarget: (intermediateTarget, { force = false, skipLock = false } = {}) =>
|
||||
@intermediateTarget = _.cloneDeep(intermediateTarget)
|
||||
@applyTarget({ intermediate: true, force, skipLock })
|
||||
.then =>
|
||||
@intermediateTarget = null
|
||||
|
Loading…
Reference in New Issue
Block a user