mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2024-12-20 14:13:08 +00:00
Simplify the update logic by making fetch and kill (the only long-running actions) happen in the background, and always waiting for all actions before continuing
Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
parent
c0ac2c21a4
commit
3fd52bb0c7
@ -227,7 +227,7 @@ class ApplicationManagerRouter
|
||||
errMsg = 'Service not found, a container must exist for service stop to work.'
|
||||
return res.status(404).send(errMsg)
|
||||
@applications.setTargetVolatileForService(service.imageId, running: false)
|
||||
@applications.executeStepAction(serviceAction('stop', service.serviceId, service, service), { skipLock: true })
|
||||
@applications.executeStepAction(serviceAction('stop', service.serviceId, service, service, { wait: true }), { skipLock: true })
|
||||
.then ->
|
||||
res.status(200).send('OK')
|
||||
.catch (err) ->
|
||||
@ -280,7 +280,8 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@actionExecutors = {
|
||||
stop: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@services.kill(step.current, { removeContainer: false })
|
||||
wait = step.options?.wait ? false
|
||||
@services.kill(step.current, { removeContainer: false, wait })
|
||||
kill: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
@services.kill(step.current)
|
||||
@ -292,7 +293,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
restart: (step, { force = false, skipLock = false } = {}) =>
|
||||
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
|
||||
Promise.try =>
|
||||
@services.kill(step.current)
|
||||
@services.kill(step.current, { wait: true })
|
||||
.then =>
|
||||
@services.start(step.target)
|
||||
stopAll: (step, { force = false, skipLock = false } = {}) =>
|
||||
@ -310,12 +311,12 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@images.getAvailable()
|
||||
(opts, availableImages) =>
|
||||
opts.deltaSource = @bestDeltaSource(step.image, availableImages)
|
||||
@images.fetch(step.image, opts)
|
||||
@images.triggerFetch step.image, opts, (success) =>
|
||||
@fetchesInProgress -= 1
|
||||
@timeSpentFetching += process.hrtime(startTime)[0]
|
||||
if success
|
||||
@reportCurrentState(update_downloaded: true)
|
||||
)
|
||||
.finally =>
|
||||
@fetchesInProgress -= 1
|
||||
@timeSpentFetching += process.hrtime(startTime)[0]
|
||||
@reportCurrentState(update_downloaded: true)
|
||||
removeImage: (step) =>
|
||||
@images.remove(step.image)
|
||||
saveImage: (step) =>
|
||||
@ -576,23 +577,20 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
|
||||
# TODO: account for volumes-from, networks-from, links, etc
|
||||
# TODO: support networks instead of only networkMode
|
||||
_dependenciesMetForServiceStart: (target, networkPairs, volumePairs, pendingPairs, stepsInProgress) ->
|
||||
_dependenciesMetForServiceStart: (target, networkPairs, volumePairs, pendingPairs) ->
|
||||
# for dependsOn, check no install or update pairs have that service
|
||||
dependencyUnmet = _.some target.dependsOn ? [], (dependency) ->
|
||||
_.find(pendingPairs, (pair) -> pair.target?.serviceName == dependency)? or _.find(stepsInProgress, (step) -> step.target?.serviceName == dependency)?
|
||||
_.find(pendingPairs, (pair) -> pair.target?.serviceName == dependency)?
|
||||
if dependencyUnmet
|
||||
return false
|
||||
# for networks and volumes, check no network pairs have that volume name
|
||||
if _.find(networkPairs, (pair) -> "#{target.appId}_#{pair.target?.name}" == target.networkMode)?
|
||||
return false
|
||||
if _.find(stepsInProgress, (step) -> step.model == 'network' and "#{target.appId}_#{step.target?.name}" == target.networkMode)?
|
||||
return false
|
||||
volumeUnmet = _.some target.volumes, (volumeDefinition) ->
|
||||
[ sourceName, destName ] = volumeDefinition.split(':')
|
||||
if !destName? # If this is not a named volume, ignore it
|
||||
return false
|
||||
return _.find(volumePairs, (pair) -> "#{target.appId}_#{pair.target?.name}" == sourceName)? or
|
||||
_.find(stepsInProgress, (step) -> step.model == 'volume' and "#{target.appId}_#{step.target?.name}" == sourceName)?
|
||||
return _.find(volumePairs, (pair) -> "#{target.appId}_#{pair.target?.name}" == sourceName)?
|
||||
return !volumeUnmet
|
||||
|
||||
# Unless the update strategy requires an early kill (i.e. kill-then-download, delete-then-download), we only want
|
||||
@ -606,7 +604,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return false
|
||||
return true
|
||||
|
||||
_nextStepsForNetworkOrVolume: ({ current, target }, currentApp, changingPairs, dependencyComparisonFn, model, stepsInProgress) ->
|
||||
_nextStepsForNetworkOrVolume: ({ current, target }, currentApp, changingPairs, dependencyComparisonFn, model) ->
|
||||
# Check none of the currentApp.services use this network or volume
|
||||
if current?
|
||||
dependencies = _.filter currentApp.services, (service) ->
|
||||
@ -618,24 +616,24 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
# we have to kill them before removing the network/volume (e.g. when we're only updating the network config)
|
||||
steps = []
|
||||
for dependency in dependencies
|
||||
if !_.some(changingPairs, (pair) -> pair.serviceId == dependency.serviceId) and !_.find(stepsInProgress, (step) -> step.serviceId == dependency.serviceId)?
|
||||
if dependency.status != 'Stopping' and !_.some(changingPairs, (pair) -> pair.serviceId == dependency.serviceId)
|
||||
steps.push(serviceAction('kill', dependency.serviceId, dependency))
|
||||
return steps
|
||||
else if target?
|
||||
return [{ action: 'createNetworkOrVolume', model, target }]
|
||||
|
||||
_nextStepsForNetwork: ({ current, target }, currentApp, changingPairs, stepsInProgress) =>
|
||||
_nextStepsForNetwork: ({ current, target }, currentApp, changingPairs) =>
|
||||
dependencyComparisonFn = (service, current) ->
|
||||
service.networkMode == "#{service.appId}_#{current?.name}"
|
||||
@_nextStepsForNetworkOrVolume({ current, target }, currentApp, changingPairs, dependencyComparisonFn, 'network', stepsInProgress)
|
||||
@_nextStepsForNetworkOrVolume({ current, target }, currentApp, changingPairs, dependencyComparisonFn, 'network')
|
||||
|
||||
_nextStepsForVolume: ({ current, target }, currentApp, changingPairs, stepsInProgress) ->
|
||||
_nextStepsForVolume: ({ current, target }, currentApp, changingPairs) ->
|
||||
# Check none of the currentApp.services use this network or volume
|
||||
dependencyComparisonFn = (service, current) ->
|
||||
_.some service.volumes, (volumeDefinition) ->
|
||||
sourceName = volumeDefinition.split(':')[0]
|
||||
sourceName == "#{service.appId}_#{current?.name}"
|
||||
@_nextStepsForNetworkOrVolume({ current, target }, currentApp, changingPairs, dependencyComparisonFn, 'volume', stepsInProgress)
|
||||
@_nextStepsForNetworkOrVolume({ current, target }, currentApp, changingPairs, dependencyComparisonFn, 'volume')
|
||||
|
||||
# Infers steps that do not require creating a new container
|
||||
_updateContainerStep: (current, target) ->
|
||||
@ -679,15 +677,20 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
}
|
||||
|
||||
_nextStepForService: ({ current, target }, updateContext) =>
|
||||
{ targetApp, networkPairs, volumePairs, installPairs, updatePairs, stepsInProgress, availableImages } = updateContext
|
||||
if _.find(stepsInProgress, (step) -> step.serviceId == target.serviceId)?
|
||||
# There is already a step in progress for this service, so we wait
|
||||
return null
|
||||
{ targetApp, networkPairs, volumePairs, installPairs, updatePairs, availableImages, downloading } = updateContext
|
||||
if current?.status == 'Stopping'
|
||||
# There is already a kill step in progress for this service, so we wait
|
||||
return { action: 'noop' }
|
||||
|
||||
needsDownload = !_.some availableImages, (image) =>
|
||||
image.dockerImageId == target.image or @images.isSameImage(image, { name: target.imageName })
|
||||
image.dockerImageId == target?.image or @images.isSameImage(image, { name: target.imageName })
|
||||
|
||||
# This service needs an image download but it's currently downloading, so we wait
|
||||
if needsDownload and target?.imageId in downloading
|
||||
return { action: 'noop' }
|
||||
|
||||
dependenciesMetForStart = =>
|
||||
@_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs), stepsInProgress)
|
||||
@_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs))
|
||||
dependenciesMetForKill = =>
|
||||
!needsDownload and @_dependenciesMetForServiceKill(target, targetApp, availableImages)
|
||||
|
||||
@ -709,7 +712,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
timeout = checkInt(target.labels['io.resin.update.handover-timeout'])
|
||||
return @_strategySteps[strategy](current, target, needsDownload, dependenciesMetForStart, dependenciesMetForKill, needsSpecialKill, timeout)
|
||||
|
||||
_nextStepsForAppUpdate: (currentApp, targetApp, availableImages = [], stepsInProgress = []) =>
|
||||
_nextStepsForAppUpdate: (currentApp, targetApp, availableImages = [], downloading = []) =>
|
||||
emptyApp = { services: [], volumes: {}, networks: {} }
|
||||
if !targetApp?
|
||||
targetApp = emptyApp
|
||||
@ -735,12 +738,14 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
steps = []
|
||||
# All removePairs get a 'kill' action
|
||||
for pair in removePairs
|
||||
if !_.find(stepsInProgress, (step) -> step.serviceId == pair.current.serviceId)?
|
||||
if pair.current.status != 'Stopping'
|
||||
steps.push(serviceAction('kill', pair.current.serviceId, pair.current, null))
|
||||
else
|
||||
steps.push({ action: 'noop' })
|
||||
# next step for install pairs in download - start order, but start requires dependencies, networks and volumes met
|
||||
# next step for update pairs in order by update strategy. start requires dependencies, networks and volumes met.
|
||||
for pair in installPairs.concat(updatePairs)
|
||||
step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, stepsInProgress, availableImages })
|
||||
step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, availableImages, downloading })
|
||||
if step?
|
||||
steps.push(step)
|
||||
# next step for network pairs - remove requires services killed, create kill if no pairs or steps affect that service
|
||||
@ -751,7 +756,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
for pair in volumePairs
|
||||
pairSteps = @_nextStepsForVolume(pair, currentApp, removePairs.concat(updatePairs))
|
||||
steps = steps.concat(pairSteps)
|
||||
return steps
|
||||
return _.map(steps, (step) -> _.assign({}, step, { appId }))
|
||||
|
||||
normaliseAppForDB: (app) =>
|
||||
services = _.map app.services, (s, serviceId) ->
|
||||
@ -916,7 +921,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return notUsedForDelta and notUsedByProxyvisor
|
||||
return { imagesToSave, imagesToRemove }
|
||||
|
||||
_inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress, ignoreImages, localMode) =>
|
||||
_inferNextSteps: (cleanupNeeded, availableImages, downloading, supervisorNetworkReady, current, target, ignoreImages, localMode) =>
|
||||
Promise.try =>
|
||||
if checkTruthy(localMode)
|
||||
target = _.cloneDeep(target)
|
||||
@ -930,7 +935,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
if !supervisorNetworkReady
|
||||
nextSteps.push({ action: 'ensureSupervisorNetwork' })
|
||||
else
|
||||
if !ignoreImages and !_.some(stepsInProgress, (step) -> step.action == 'fetch')
|
||||
if !ignoreImages and _.isEmpty(downloading)
|
||||
if cleanupNeeded
|
||||
nextSteps.push({ action: 'cleanup' })
|
||||
{ imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages)
|
||||
@ -943,19 +948,16 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
if _.isEmpty(nextSteps)
|
||||
allAppIds = _.union(_.keys(currentByAppId), _.keys(targetByAppId))
|
||||
for appId in allAppIds
|
||||
nextSteps = nextSteps.concat(@_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], availableImages, stepsInProgress))
|
||||
return @_removeDuplicateSteps(nextSteps, stepsInProgress)
|
||||
|
||||
_removeDuplicateSteps: (nextSteps, stepsInProgress) ->
|
||||
withoutProgressDups = _.filter nextSteps, (step) ->
|
||||
!_.find(stepsInProgress, (s) -> _.isEqual(s, step))?
|
||||
_.uniqWith(withoutProgressDups, _.isEqual)
|
||||
nextSteps = nextSteps.concat(@_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], availableImages, downloading))
|
||||
if !ignoreImages and _.isEmpty(nextSteps) and !_.isEmpty(downloading)
|
||||
nextSteps.push({ action: 'noop' })
|
||||
return _.uniqWith(nextSteps, _.isEqual)
|
||||
|
||||
stopAll: ({ force = false, skipLock = false } = {}) =>
|
||||
@services.getAll()
|
||||
.map (service) =>
|
||||
@_lockingIfNecessary service.appId, { force, skipLock }, =>
|
||||
@services.kill(service, { removeContainer: false })
|
||||
@services.kill(service, { removeContainer: false, wait: true })
|
||||
|
||||
_lockingIfNecessary: (appId, { force = false, skipLock = false } = {}, fn) =>
|
||||
if skipLock
|
||||
@ -973,18 +975,19 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return Promise.reject(new Error("Invalid action #{step.action}"))
|
||||
@actionExecutors[step.action](step, { force, skipLock })
|
||||
|
||||
getRequiredSteps: (currentState, targetState, stepsInProgress, ignoreImages = false) =>
|
||||
getRequiredSteps: (currentState, targetState, ignoreImages = false) =>
|
||||
Promise.join(
|
||||
@images.isCleanupNeeded()
|
||||
@images.getAvailable()
|
||||
@images.getDownloadingImageIds()
|
||||
@networks.supervisorNetworkReady()
|
||||
@config.get('localMode')
|
||||
(cleanupNeeded, availableImages, supervisorNetworkReady, localMode) =>
|
||||
@_inferNextSteps(cleanupNeeded, availableImages, supervisorNetworkReady, currentState, targetState, stepsInProgress, ignoreImages, localMode)
|
||||
(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, localMode) =>
|
||||
@_inferNextSteps(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, currentState, targetState, ignoreImages, localMode)
|
||||
.then (nextSteps) =>
|
||||
if ignoreImages and _.some(nextSteps, (step) -> step.action == 'fetch')
|
||||
throw new Error('Cannot fetch images while executing an API action')
|
||||
@proxyvisor.getRequiredSteps(availableImages, currentState, targetState, nextSteps.concat(stepsInProgress))
|
||||
@proxyvisor.getRequiredSteps(availableImages, downloading, currentState, targetState, nextSteps)
|
||||
.then (proxyvisorSteps) ->
|
||||
return nextSteps.concat(proxyvisorSteps)
|
||||
)
|
||||
|
@ -38,10 +38,12 @@ module.exports = class Images extends EventEmitter
|
||||
delete @volatileState[imageId]
|
||||
@emit('change')
|
||||
|
||||
fetch: (image, opts) =>
|
||||
triggerFetch: (image, opts, onFinish) =>
|
||||
onProgress = (progress) =>
|
||||
@reportChange(image.imageId, { downloadProgress: progress.percentage })
|
||||
|
||||
onFinish ?= _.noop
|
||||
|
||||
@normalise(image.name)
|
||||
.then (imageName) =>
|
||||
image = _.clone(image)
|
||||
@ -49,15 +51,18 @@ module.exports = class Images extends EventEmitter
|
||||
@markAsSupervised(image)
|
||||
.then =>
|
||||
@inspectByName(imageName)
|
||||
.tap (img) =>
|
||||
.then (img) =>
|
||||
@db.models('image').update({ dockerImageId: img.Id }).where(image)
|
||||
.then ->
|
||||
onFinish(true)
|
||||
return
|
||||
.catch =>
|
||||
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 }))
|
||||
Promise.try =>
|
||||
if validation.checkTruthy(opts.delta) and opts.deltaSource and opts.deltaSource != 'resin/scratch'
|
||||
@logger.logSystemEvent(logTypes.downloadImageDelta, { image })
|
||||
@inspectByName(opts.deltaSource)
|
||||
.then (srcImage) ->
|
||||
.then (srcImage) =>
|
||||
opts.deltaSourceId = srcImage.Id
|
||||
@docker.rsyncImageWithProgress(imageName, opts, onProgress)
|
||||
.tap (id) =>
|
||||
@ -72,12 +77,15 @@ module.exports = class Images extends EventEmitter
|
||||
@db.models('image').update({ dockerImageId: id }).where(image)
|
||||
.then =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageSuccess, { image })
|
||||
@inspectByName(imageName)
|
||||
return true
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageError, { image, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
return false
|
||||
.then (success) =>
|
||||
@reportChange(image.imageId)
|
||||
onFinish(success)
|
||||
return
|
||||
return
|
||||
|
||||
format: (image) ->
|
||||
image.serviceId ?= null
|
||||
@ -184,6 +192,10 @@ module.exports = class Images extends EventEmitter
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
_.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages))
|
||||
|
||||
getDownloadingImageIds: =>
|
||||
Promise.try =>
|
||||
return _.map(_.keys(_.pickBy(@volatileState, (s) -> s.status == 'Downloading')), validation.checkInt)
|
||||
|
||||
cleanupDatabase: =>
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
Promise.map supervisedImages, (image) =>
|
||||
|
@ -40,46 +40,45 @@ module.exports = class ServiceManager extends EventEmitter
|
||||
reportNewStatus: (containerId, service, status) =>
|
||||
@reportChange(containerId, _.merge({ status }, _.pick(service, [ 'imageId', 'appId', 'releaseId', 'commit' ])))
|
||||
|
||||
_killContainer: (containerId, service = {}, { removeContainer = true }) =>
|
||||
@logger.logSystemEvent(logTypes.stopService, { service })
|
||||
if service.imageId?
|
||||
@reportNewStatus(containerId, service, 'Stopping')
|
||||
containerObj = @docker.getContainer(containerId)
|
||||
containerObj.stop()
|
||||
.then ->
|
||||
if removeContainer
|
||||
containerObj.remove(v: true)
|
||||
.catch (err) =>
|
||||
# Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison
|
||||
# reasons.
|
||||
statusCode = checkInt(err.statusCode)
|
||||
# 304 means the container was already stopped - so we can just remove it
|
||||
if statusCode is 304
|
||||
@logger.logSystemEvent(logTypes.stopServiceNoop, { service })
|
||||
if removeContainer
|
||||
return containerObj.remove(v: true)
|
||||
return
|
||||
# 404 means the container doesn't exist, precisely what we want! :D
|
||||
if statusCode is 404
|
||||
@logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service })
|
||||
return
|
||||
throw err
|
||||
.tap =>
|
||||
delete @containerHasDied[containerId]
|
||||
.tap =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceSuccess, { service })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceError, { service, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
_killContainer: (containerId, service = {}, { removeContainer = true, wait = false }) =>
|
||||
Promise.try =>
|
||||
@logger.logSystemEvent(logTypes.stopService, { service })
|
||||
if service.imageId?
|
||||
@reportChange(containerId)
|
||||
@reportNewStatus(containerId, service, 'Stopping')
|
||||
containerObj = @docker.getContainer(containerId)
|
||||
killPromise = containerObj.stop().then ->
|
||||
if removeContainer
|
||||
containerObj.remove(v: true)
|
||||
.catch (err) =>
|
||||
# Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison
|
||||
# reasons.
|
||||
statusCode = checkInt(err.statusCode)
|
||||
# 304 means the container was already stopped - so we can just remove it
|
||||
if statusCode is 304
|
||||
@logger.logSystemEvent(logTypes.stopServiceNoop, { service })
|
||||
if removeContainer
|
||||
return containerObj.remove(v: true)
|
||||
return
|
||||
# 404 means the container doesn't exist, precisely what we want! :D
|
||||
if statusCode is 404
|
||||
@logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service })
|
||||
return
|
||||
throw err
|
||||
.tap =>
|
||||
delete @containerHasDied[containerId]
|
||||
.tap =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceSuccess, { service })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceError, { service, error: err })
|
||||
.finally =>
|
||||
if service.imageId?
|
||||
@reportChange(containerId)
|
||||
if wait
|
||||
return killPromise
|
||||
return
|
||||
|
||||
kill: (service, { removeContainer = true } = {}) =>
|
||||
@_killContainer(service.containerId, service, { removeContainer })
|
||||
.then ->
|
||||
service.running = false
|
||||
return service
|
||||
kill: (service, { removeContainer = true, wait = false } = {}) =>
|
||||
@_killContainer(service.containerId, service, { removeContainer, wait })
|
||||
|
||||
getAllByAppId: (appId) =>
|
||||
@getAll("io.resin.app-id=#{appId}")
|
||||
@ -161,6 +160,10 @@ module.exports = class ServiceManager extends EventEmitter
|
||||
.mapSeries (container) =>
|
||||
@docker.getContainer(container.Id).inspect()
|
||||
.then(Service.fromContainer)
|
||||
.then (service) =>
|
||||
if @volatileState[service.containerId]?.status?
|
||||
service.status = @volatileState[service.containerId].status
|
||||
return service
|
||||
|
||||
# Returns an array with the container(s) matching a service by appId, commit, image and environment
|
||||
get: (service) =>
|
||||
|
@ -140,7 +140,7 @@ module.exports = class DeviceConfig
|
||||
return true
|
||||
else return false
|
||||
|
||||
getRequiredSteps: (currentState, targetState, stepsInProgress) =>
|
||||
getRequiredSteps: (currentState, targetState) =>
|
||||
current = _.clone(currentState.local?.config ? {})
|
||||
target = _.clone(targetState.local?.config ? {})
|
||||
steps = []
|
||||
@ -192,13 +192,7 @@ module.exports = class DeviceConfig
|
||||
})
|
||||
return
|
||||
.then ->
|
||||
needsWait = !_.isEmpty(steps)
|
||||
filteredSteps = _.filter steps, (step) ->
|
||||
!_.find(stepsInProgress, (stepInProgress) -> _.isEqual(stepInProgress, step))?
|
||||
|
||||
if _.isEmpty(filteredSteps) and needsWait
|
||||
return [{ action: 'noop' }]
|
||||
else return filteredSteps
|
||||
return steps
|
||||
|
||||
executeStepAction: (step, opts) =>
|
||||
@actionExecutors[step.action](step, opts)
|
||||
|
@ -443,24 +443,17 @@ module.exports = class DeviceState extends EventEmitter
|
||||
else
|
||||
throw new Error("Invalid action #{step.action}")
|
||||
|
||||
applyStep: (step, { force, initial, intermediate, skipLock }, updateContext) =>
|
||||
applyStep: (step, { force, initial, intermediate, skipLock }) =>
|
||||
if @shuttingDown
|
||||
return
|
||||
@usingInferStepsLock ->
|
||||
updateContext.stepsInProgress.push(step)
|
||||
.then =>
|
||||
@executeStepAction(step, { force, initial, skipLock })
|
||||
.finally =>
|
||||
@usingInferStepsLock ->
|
||||
_.pullAllWith(updateContext.stepsInProgress, [ step ], _.isEqual)
|
||||
@executeStepAction(step, { force, initial, skipLock })
|
||||
.catch (err) =>
|
||||
@emitAsync('step-error', err, step)
|
||||
throw err
|
||||
.then (stepResult) =>
|
||||
@emitAsync('step-completed', null, step, stepResult)
|
||||
@continueApplyTarget({ force, initial, intermediate }, updateContext)
|
||||
|
||||
applyError: (err, { force, initial, intermediate }, updateContext) =>
|
||||
applyError: (err, { force, initial, intermediate }) =>
|
||||
@emitAsync('apply-target-state-error', err)
|
||||
@emitAsync('apply-target-state-end', err)
|
||||
if !intermediate
|
||||
@ -468,24 +461,16 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@reportCurrentState(update_failed: true)
|
||||
if @scheduledApply?
|
||||
console.log("Updating failed, but there's another update scheduled immediately: ", err)
|
||||
force = force or @scheduledApply.force
|
||||
delay = @scheduledApply.delay
|
||||
@scheduledApply = null
|
||||
else
|
||||
delay = Math.min((2 ** @failedUpdates) * 500, 30000)
|
||||
# If there was an error then schedule another attempt briefly in the future.
|
||||
console.log('Scheduling another update attempt due to failure: ', delay, err)
|
||||
@continueApplyTarget({ force, initial, intermediate }, updateContext, delay)
|
||||
@triggerApplyTarget({ force, delay, initial })
|
||||
else
|
||||
updateContext.applyContinueScheduled = true
|
||||
throw err
|
||||
|
||||
applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false } = {}, updateContext) =>
|
||||
if !updateContext?
|
||||
updateContext = {
|
||||
stepsInProgress: []
|
||||
applyContinueScheduled: false
|
||||
}
|
||||
applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false } = {}) =>
|
||||
nextDelay = 200
|
||||
Promise.try =>
|
||||
if !intermediate
|
||||
@applyBlocker
|
||||
@ -496,42 +481,36 @@ module.exports = class DeviceState extends EventEmitter
|
||||
@getCurrentForComparison()
|
||||
@getTarget({ initial, intermediate })
|
||||
(currentState, targetState) =>
|
||||
@deviceConfig.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress)
|
||||
@deviceConfig.getRequiredSteps(currentState, targetState)
|
||||
.then (deviceConfigSteps) =>
|
||||
if !_.isEmpty(deviceConfigSteps)
|
||||
return deviceConfigSteps
|
||||
else
|
||||
@applications.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress, intermediate)
|
||||
@applications.getRequiredSteps(currentState, targetState, intermediate)
|
||||
)
|
||||
.then (steps) =>
|
||||
if _.isEmpty(steps) and _.isEmpty(updateContext.stepsInProgress)
|
||||
if _.isEmpty(steps)
|
||||
@emitAsync('apply-target-state-end', null)
|
||||
if !intermediate
|
||||
console.log('Finished applying target state')
|
||||
@applications.timeSpentFetching = 0
|
||||
@failedUpdates = 0
|
||||
@lastSuccessfulUpdate = Date.now()
|
||||
@reportCurrentState(update_failed: false, update_pending: false, update_downloaded: false)
|
||||
@emitAsync('apply-target-state-end', null)
|
||||
return
|
||||
if !intermediate
|
||||
@reportCurrentState(update_pending: true)
|
||||
if _.every(steps, (step) -> step.action == 'noop')
|
||||
nextDelay = 1000
|
||||
Promise.map steps, (step) =>
|
||||
@applyStep(step, { force, initial, intermediate, skipLock }, updateContext)
|
||||
.catch (err) =>
|
||||
@applyError(err, { force, initial, intermediate }, updateContext)
|
||||
|
||||
continueApplyTarget: ({ force = false, initial = false, intermediate = false } = {}, updateContext, delay = 100) =>
|
||||
Promise.try =>
|
||||
if !intermediate
|
||||
@applyBlocker
|
||||
.then =>
|
||||
if updateContext.applyContinueScheduled
|
||||
return
|
||||
updateContext.applyContinueScheduled = true
|
||||
Promise.delay(delay)
|
||||
console.log('Running ' + step.action)
|
||||
@applyStep(step, { force, initial, intermediate, skipLock })
|
||||
.then ->
|
||||
Promise.delay(nextDelay)
|
||||
.then =>
|
||||
updateContext.applyContinueScheduled = false
|
||||
@applyTarget({ force, initial, intermediate }, updateContext)
|
||||
@applyTarget({ force, initial, intermediate, skipLock })
|
||||
.catch (err) =>
|
||||
@applyError(err, { force, initial, intermediate })
|
||||
|
||||
pausingApply: (fn) =>
|
||||
lock = =>
|
||||
|
@ -512,7 +512,48 @@ module.exports = class Proxyvisor
|
||||
dependent: true
|
||||
}
|
||||
|
||||
getRequiredSteps: (availableImages, current, target, stepsInProgress) =>
|
||||
nextStepsForDependentApp: (appId, availableImages, downloading, current, target, currentDevices, targetDevices, stepsInProgress) =>
|
||||
# - if there's current but not target, push a removeDependentApp step
|
||||
if !target?
|
||||
return [{
|
||||
action: 'removeDependentApp'
|
||||
appId: current.appId
|
||||
}]
|
||||
|
||||
if _.some(stepsInProgress, (step) -> step.appId == target.parentApp)
|
||||
return [{ action: 'noop' }]
|
||||
|
||||
needsDownload = target.commit? and target.image? and !@_imageAvailable(target.image, availableImages)
|
||||
|
||||
# - if toBeDownloaded includes this app, push a fetch step
|
||||
if needsDownload
|
||||
if target.imageId in downloading
|
||||
return [{ action: 'noop' }]
|
||||
else
|
||||
return [{
|
||||
action: 'fetch'
|
||||
appId
|
||||
image: @imageForDependentApp(target)
|
||||
}]
|
||||
|
||||
devicesDiffer = @_compareDevices(currentDevices, targetDevices, appId)
|
||||
# - if current doesn't match target, or the devices differ, push an updateDependentTargets step
|
||||
if !_.isEqual(current, target) or devicesDiffer
|
||||
return [{
|
||||
action: 'updateDependentTargets'
|
||||
devices: targetDevices
|
||||
app: target
|
||||
appId
|
||||
}]
|
||||
|
||||
# if we got to this point, the current app is up to date and devices have the
|
||||
# correct targetCommit, targetEnvironment and targetConfig.
|
||||
hookStep = @_getHookStep(currentDevices, appId)
|
||||
if !_.isEmpty(hookStep.devices)
|
||||
return [ hookStep ]
|
||||
return []
|
||||
|
||||
getRequiredSteps: (availableImages, downloading, current, target, stepsInProgress) =>
|
||||
steps = []
|
||||
Promise.try =>
|
||||
targetApps = _.keyBy(target.dependent?.apps ? [], 'appId')
|
||||
@ -521,54 +562,18 @@ module.exports = class Proxyvisor
|
||||
currentAppIds = _.keys(currentApps)
|
||||
allAppIds = _.union(targetAppIds, currentAppIds)
|
||||
|
||||
toBeDownloaded = _.filter targetAppIds, (appId) =>
|
||||
return targetApps[appId].commit? and targetApps[appId].image? and !@_imageAvailable(targetApps[appId].image, availableImages)
|
||||
|
||||
appIdsToCheck = _.filter allAppIds, (appId) ->
|
||||
# - if a step is in progress for this appId, ignore
|
||||
!_.some(steps.concat(stepsInProgress), (step) -> step.appId == appId)
|
||||
for appId in appIdsToCheck
|
||||
# - if there's current but not target, push a removeDependentApp step
|
||||
if !targetApps[appId]?
|
||||
steps.push({
|
||||
action: 'removeDependentApp'
|
||||
appId
|
||||
})
|
||||
return
|
||||
|
||||
# - if toBeDownloaded includes this app, push a fetch step
|
||||
if _.includes(toBeDownloaded, appId)
|
||||
steps.push({
|
||||
action: 'fetch'
|
||||
appId
|
||||
image: @imageForDependentApp(targetApps[appId])
|
||||
})
|
||||
return
|
||||
|
||||
for appId in allAppIds
|
||||
devicesForApp = (devices) ->
|
||||
_.filter devices, (d) ->
|
||||
_.includes(_.keys(d.apps), appId)
|
||||
|
||||
currentDevices = devicesForApp(current.dependent.devices)
|
||||
targetDevices = devicesForApp(target.dependent.devices)
|
||||
|
||||
devicesDiffer = @_compareDevices(currentDevices, targetDevices, appId)
|
||||
# - if current doesn't match target, or the devices differ, push an updateDependentTargets step
|
||||
if !_.isEqual(currentApps[appId], targetApps[appId]) or devicesDiffer
|
||||
steps.push({
|
||||
action: 'updateDependentTargets'
|
||||
devices: targetDevices
|
||||
app: targetApps[appId]
|
||||
appId
|
||||
})
|
||||
return
|
||||
|
||||
# if we got to this point, the current app is up to date and devices have the
|
||||
# correct targetCommit, targetEnvironment and targetConfig.
|
||||
hookStep = @_getHookStep(currentDevices, appId)
|
||||
if !_.isEmpty(hookStep.devices)
|
||||
steps.push(hookStep)
|
||||
.then ->
|
||||
stepsForApp = @nextStepsForDependentApp(appId, availableImages, downloading,
|
||||
currentApps[appId], targetApps[appId],
|
||||
currentDevices, targetDevices,
|
||||
stepsInProgress)
|
||||
steps = steps.concat(stepsForApp)
|
||||
return steps
|
||||
|
||||
getHookEndpoint: (appId) =>
|
||||
|
Loading…
Reference in New Issue
Block a user