diff --git a/src/application-manager.coffee b/src/application-manager.coffee index cdafa42e..cb1e07fa 100644 --- a/src/application-manager.coffee +++ b/src/application-manager.coffee @@ -44,11 +44,11 @@ fetchAction = (service) -> image: imageForService(service) serviceId: service.serviceId } -# TODO: implement v2 endpoints +# TODO: implement additional v2 endpoints # v1 endpoins only work for single-container apps as they assume the app has a single service. class ApplicationManagerRouter constructor: (@applications) -> - { @proxyvisor, @eventTracker } = @applications + { @proxyvisor, @eventTracker, @deviceState, @_lockingIfNecessary } = @applications @router = express.Router() @router.use(bodyParser.urlencoded(extended: true)) @router.use(bodyParser.json()) @@ -165,10 +165,71 @@ class ApplicationManagerRouter res.status(200).json(Data: 'OK', Error: '') .catch (err) -> res.status(503).send(err?.message or err or 'Unknown error') + + @router.post '/v2/applications/:appId/purge', (req, res) => + { force } = req.body + { appId } = req.params + @_lockingIfNecessary appId, { force }, => + @deviceState.getCurrentForComparison() + .then (currentState) => + app = currentState.local.apps[appId] + purgedApp = _.cloneDeep(app) + purgedApp.services = [] + purgedApp.volumes = {} + currentState.local.apps[appId] = purgedApp + @deviceState.applyIntermediateTarget(currentState, { skipLock: true }) + .then => + currentState.local.apps[appId] = app + @deviceState.applyIntermediateTarget(currentState, { skipLock: true }) + .then -> + res.status(200).send('OK') + .catch (err) -> + res.status(503).send(err?.message or err or 'Unknown error') + + @router.post '/v2/applications/:appId/restart-service', (req, res) => + { imageId, force } = req.body + { appId } = req.params + @_lockingIfNecessary appId, { force }, => + @applications.getCurrentApp(appId) + .then (app) => + if !app? + errMsg = "App not found: an app needs to be installed for purge to work. + If you've recently moved this device from another app, + please push an app and wait for it to be installed first." + return res.status(404).send(errMsg) + service = _.find(app.services, (s) -> s.imageId == imageId) + if !service? + errMsg = 'Service not found, a container must exist for service restart to work.' + return res.status(404).send(errMsg) + @applications.executeStepAction(serviceAction('restart', service.serviceId, service, service), { skipLock: true }) + .then -> + res.status(200).send('OK') + .catch (err) -> + res.status(503).send(err?.message or err or 'Unknown error') + + @router.post '/v2/applications/:appId/restart', (req, res) => + { force } = req.body + { appId } = req.params + @_lockingIfNecessary appId, { force }, => + @deviceState.getCurrentForComparison() + .then (currentState) => + app = currentState.local.apps[appId] + stoppedApp = _.cloneDeep(app) + stoppedApp.services = [] + currentState.local.apps[appId] = stoppedApp + @deviceState.applyIntermediateTarget(currentState, { skipLock: true }) + .then => + currentState.local.apps[appId] = app + @deviceState.applyIntermediateTarget(currentState, { skipLock: true }) + .then -> + res.status(200).send('OK') + .catch (err) -> + res.status(503).send(err?.message or err or 'Unknown error') + @router.use(@proxyvisor.router) module.exports = class ApplicationManager extends EventEmitter - constructor: ({ @logger, @config, @db, @eventTracker }) -> + constructor: ({ @logger, @config, @db, @eventTracker, @deviceState }) -> @docker = new Docker() @images = new Images({ @docker, @logger, @db }) @services = new ServiceManager({ @docker, @logger, @images, @config }) @@ -179,21 +240,21 @@ module.exports = class ApplicationManager extends EventEmitter @fetchesInProgress = 0 @_targetVolatilePerServiceId = {} @actionExecutors = { - stop: (step, { force = false } = {}) => - @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + stop: (step, { force = false, skipLock = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, => @services.kill(step.current, { removeContainer: false }) - kill: (step, { force = false } = {}) => - @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + kill: (step, { force = false, skipLock = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, => @services.kill(step.current) .then => if step.options?.removeImage @images.removeByDockerId(step.current.image) updateMetadata: (step) => @services.updateMetadata(step.current, step.target) - purge: (step, { force = false } = {}) => + purge: (step, { force = false, skipLock = false } = {}) => appId = step.appId @logger.logSystemMessage("Purging data for app #{appId}", { appId }, 'Purge data') - @_lockingIfNecessary appId, { force, skipLock: step.options?.skipLock }, => + @_lockingIfNecessary appId, { force, skipLock: skipLock or step.options?.skipLock }, => @getCurrentApp(appId) .then (app) => if !_.isEmpty(app?.services) @@ -210,18 +271,18 @@ module.exports = class ApplicationManager extends EventEmitter .catch (err) => @logger.logSystemMessage("Error purging data: #{err}", { appId, error: err }, 'Purge data error') throw err - restart: (step, { force = false } = {}) => - @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + restart: (step, { force = false, skipLock = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, => Promise.try => @services.kill(step.current) .then => @services.start(step.target) - stopAll: (step, { force = false } = {}) => - @stopAll({ force }) + stopAll: (step, { force = false, skipLock = false } = {}) => + @stopAll({ force, skipLock }) start: (step) => @services.start(step.target) - handover: (step, { force = false } = {}) => - @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + handover: (step, { force = false, skipLock = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, => @services.handover(step.current, step.target) fetch: (step) => startTime = process.hrtime() @@ -327,29 +388,26 @@ module.exports = class ApplicationManager extends EventEmitter @proxyvisor.getCurrentStates() _buildApps: (services, networks, volumes) -> - apps = _.keyBy(_.map(_.uniq(_.map(services, 'appId')), (appId) -> { appId }), 'appId') + apps = {} # We iterate over the current running services and add them to the current state # of the app they belong to. for service in services appId = service.appId - apps[appId].services ?= [] + apps[appId] ?= { appId, services: [], volumes: {}, networks: {} } apps[appId].services.push(service) for network in networks appId = network.appId - apps[appId] ?= { appId } - apps[appId].networks ?= {} + apps[appId] ?= { appId, services: [], volumes: {}, networks: {} } apps[appId].networks[network.name] = network.config for volume in volumes appId = volume.appId - apps[appId] ?= { appId } - apps[appId].volumes ?= {} + apps[appId] ?= { appId, services: [], volumes: {}, networks: {} } apps[appId].volumes[volume.name] = volume.config - # We return the apps as an array - return _.values(apps) + return apps getCurrentForComparison: => Promise.join( @@ -366,7 +424,7 @@ module.exports = class ApplicationManager extends EventEmitter @networks.getAllByAppId(appId) @volumes.getAllByAppId(appId) (services, networks, volumes) => - return @_buildApps(services, networks, volumes)[0] + return @_buildApps(services, networks, volumes)[appId] ) getTargetApp: (appId) => @@ -621,7 +679,8 @@ module.exports = class ApplicationManager extends EventEmitter # There is already a step in progress for this service, so we wait return null - needsDownload = !_.some(availableImages, (image) => @images.isSameImage(image, { name: target.imageName })) + needsDownload = !_.some availableImages, (image) => + image.dockerImageId == target.image or @images.isSameImage(image, { name: target.imageName }) dependenciesMetForStart = => @_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs), stepsInProgress) dependenciesMetForKill = => @@ -788,6 +847,8 @@ module.exports = class ApplicationManager extends EventEmitter _.merge(service, @_targetVolatilePerServiceId[service.serviceId]) return service return app + .then (apps) -> + return _.keyBy(apps, 'appId') getDependentTargets: => @proxyvisor.getTarget() @@ -840,15 +901,15 @@ module.exports = class ApplicationManager extends EventEmitter return notUsedForDelta and notUsedByProxyvisor return { imagesToSave, imagesToRemove } - _inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress) => + _inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress, ignoreImages) => Promise.try => - currentByAppId = _.keyBy(current.local.apps ? [], 'appId') - targetByAppId = _.keyBy(target.local.apps ? [], 'appId') + currentByAppId = current.local.apps ? {} + targetByAppId = target.local.apps ? {} nextSteps = [] if !supervisorNetworkReady nextSteps.push({ action: 'ensureSupervisorNetwork' }) else - if !_.some(stepsInProgress, (step) -> step.action == 'fetch') + if !ignoreImages and !_.some(stepsInProgress, (step) -> step.action == 'fetch') if cleanupNeeded nextSteps.push({ action: 'cleanup' }) { imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages) @@ -869,36 +930,38 @@ module.exports = class ApplicationManager extends EventEmitter !_.find(stepsInProgress, (s) -> _.isEqual(s, step))? _.uniqWith(withoutProgressDups, _.isEqual) - stopAll: ({ force = false } = {}) => + stopAll: ({ force = false, skipLock = false } = {}) => @services.getAll() .map (service) => - @_lockingIfNecessary service.appId, { force }, => + @_lockingIfNecessary service.appId, { force, skipLock }, => @services.kill(service, { removeContainer: false }) _lockingIfNecessary: (appId, { force = false, skipLock = false } = {}, fn) => if skipLock - return Promise.resolve() + return Promise.try( -> fn()) @config.get('lockOverride') .then (lockOverride) -> return checkTruthy(lockOverride) or force .then (force) -> updateLock.lock(appId, { force }, fn) - executeStepAction: (step, { force = false } = {}) => + executeStepAction: (step, { force = false, skipLock = false } = {}) => if _.includes(@proxyvisor.validActions, step.action) return @proxyvisor.executeStepAction(step) if !_.includes(@validActions, step.action) return Promise.reject(new Error("Invalid action #{step.action}")) - @actionExecutors[step.action](step, { force }) + @actionExecutors[step.action](step, { force, skipLock }) - getRequiredSteps: (currentState, targetState, stepsInProgress) => + getRequiredSteps: (currentState, targetState, stepsInProgress, ignoreImages = false) => Promise.join( @images.isCleanupNeeded() @images.getAvailable() @networks.supervisorNetworkReady() (cleanupNeeded, availableImages, supervisorNetworkReady) => - @_inferNextSteps(cleanupNeeded, availableImages, supervisorNetworkReady, currentState, targetState, stepsInProgress) + @_inferNextSteps(cleanupNeeded, availableImages, supervisorNetworkReady, currentState, targetState, stepsInProgress, ignoreImages) .then (nextSteps) => + if ignoreImages and _.some(nextSteps, (step) -> step.action == 'fetch') + throw new Error('Cannot fetch images while executing an API action') @proxyvisor.getRequiredSteps(availableImages, currentState, targetState, nextSteps.concat(stepsInProgress)) .then (proxyvisorSteps) -> return nextSteps.concat(proxyvisorSteps) diff --git a/src/device-state.coffee b/src/device-state.coffee index 527a9dd6..0be9b0d6 100644 --- a/src/device-state.coffee +++ b/src/device-state.coffee @@ -155,7 +155,7 @@ module.exports = class DeviceState extends EventEmitter constructor: ({ @db, @config, @eventTracker }) -> @logger = new Logger({ @eventTracker }) @deviceConfig = new DeviceConfig({ @db, @config, @logger }) - @applications = new ApplicationManager({ @config, @logger, @db, @eventTracker }) + @applications = new ApplicationManager({ @config, @logger, @db, @eventTracker, deviceState: this }) @on 'error', (err) -> console.error('Error in deviceState: ', err, err.stack) @_currentVolatile = {} @@ -164,11 +164,9 @@ module.exports = class DeviceState extends EventEmitter @_readLock = Promise.promisify(_lock.async.readLock) @lastSuccessfulUpdate = null @failedUpdates = 0 - @stepsInProgress = [] @applyInProgress = false @lastApplyStart = process.hrtime() @scheduledApply = null - @applyContinueScheduled = false @shuttingDown = false @_router = new DeviceStateRouter(this) @router = @_router.router @@ -309,8 +307,10 @@ module.exports = class DeviceState extends EventEmitter .then => @applications.setTarget(target.local.apps, target.dependent, trx) - getTarget: ({ initial = false } = {}) => + getTarget: ({ initial = false, intermediate = false } = {}) => @usingReadLockTarget => + if intermediate + return @intermediateTarget Promise.props({ local: Promise.props({ name: @config.get('name') @@ -394,16 +394,16 @@ module.exports = class DeviceState extends EventEmitter .catch (err) => @eventTracker.track('Loading preloaded apps failed', { error: err }) - reboot: (force) => - @applications.stopAll({ force }) + reboot: (force, skipLock) => + @applications.stopAll({ force, skipLock }) .then => @logger.logSystemMessage('Rebooting', {}, 'Reboot') device.reboot() .tap => @emit('shutdown') - shutdown: (force) => - @applications.stopAll({ force }) + shutdown: (force, skipLock) => + @applications.stopAll({ force, skipLock }) .then => @logger.logSystemMessage('Shutting down', {}, 'Shutdown') device.shutdown() @@ -411,70 +411,80 @@ module.exports = class DeviceState extends EventEmitter @shuttingDown = true @emitAsync('shutdown') - executeStepAction: (step, { force, initial }) => + executeStepAction: (step, { force, initial, skipLock }) => Promise.try => if _.includes(@deviceConfig.validActions, step.action) @deviceConfig.executeStepAction(step, { initial }) else if _.includes(@applications.validActions, step.action) - @applications.executeStepAction(step, { force }) + @applications.executeStepAction(step, { force, skipLock }) else switch step.action when 'reboot' - @reboot(force) + @reboot(force, skipLock) when 'shutdown' - @shutdown(force) + @shutdown(force, skipLock) when 'noop' Promise.resolve() else throw new Error("Invalid action #{step.action}") - applyStepAsync: (step, { force, initial }) => + applyStep: (step, { force, initial, intermediate, skipLock }, updateContext) => if @shuttingDown return - @stepsInProgress.push(step) - setImmediate => - @executeStepAction(step, { force, initial }) - .finally => - @usingInferStepsLock => - _.pullAllWith(@stepsInProgress, [ step ], _.isEqual) - .then (stepResult) => - @emitAsync('step-completed', null, step, stepResult) - @continueApplyTarget({ force, initial }) - .catch (err) => - @emitAsync('step-error', err, step) - @applyError(err, force, initial) + updateContext.stepsInProgress.push(step) + @executeStepAction(step, { force, initial, skipLock }) + .finally => + @usingInferStepsLock -> + _.pullAllWith(updateContext.stepsInProgress, [ step ], _.isEqual) + .catch (err) => + @emitAsync('step-error', err, step) + throw err + .then (stepResult) => + @emitAsync('step-completed', null, step, stepResult) + @continueApplyTarget({ force, initial, intermediate }, updateContext) - applyError: (err, force, initial) => - @_applyingSteps = false - @applyInProgress = false - @failedUpdates += 1 - @reportCurrentState(update_failed: true) - if @scheduledApply? - console.log('Updating failed, but there is already another update scheduled immediately: ', err) - else - delay = Math.min((2 ** @failedUpdates) * 500, 30000) - # If there was an error then schedule another attempt briefly in the future. - console.log('Scheduling another update attempt due to failure: ', delay, err) - @triggerApplyTarget({ force, delay, initial }) + applyError: (err, force, { initial, intermediate }) => + if !intermediate + @applyInProgress = false + @failedUpdates += 1 + @reportCurrentState(update_failed: true) + if @scheduledApply? + console.log('Updating failed, but there is already another update scheduled immediately: ', err) + else + delay = Math.min((2 ** @failedUpdates) * 500, 30000) + # If there was an error then schedule another attempt briefly in the future. + console.log('Scheduling another update attempt due to failure: ', delay, err) + @triggerApplyTarget({ force, delay, initial }) @emitAsync('apply-target-state-error', err) @emitAsync('apply-target-state-end', err) + throw err - applyTarget: ({ force = false, initial = false } = {}) => - console.log('Applying target state') - @usingInferStepsLock => - Promise.join( - @getCurrentForComparison() - @getTarget({ initial }) - (currentState, targetState) => - @deviceConfig.getRequiredSteps(currentState, targetState, @stepsInProgress) - .then (deviceConfigSteps) => - if !_.isEmpty(deviceConfigSteps) - return deviceConfigSteps - else - @applications.getRequiredSteps(currentState, targetState, @stepsInProgress) - ) - .then (steps) => - if _.isEmpty(steps) and _.isEmpty(@stepsInProgress) + applyTarget: ({ force = false, initial = false, intermediate = false, skipLock = false } = {}, updateContext) => + if !updateContext? + updateContext = { + stepsInProgress: [] + applyContinueScheduled: false + } + Promise.try => + if !intermediate + @applyBlocker + .then => + console.log('Applying target state') + @usingInferStepsLock => + Promise.join( + @getCurrentForComparison() + @getTarget({ initial, intermediate }) + (currentState, targetState) => + @deviceConfig.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress) + .then (deviceConfigSteps) => + if !_.isEmpty(deviceConfigSteps) + return deviceConfigSteps + else + @applications.getRequiredSteps(currentState, targetState, updateContext.stepsInProgress, intermediate) + ) + .then (steps) => + if _.isEmpty(steps) and _.isEmpty(updateContext.stepsInProgress) + if !intermediate console.log('Finished applying target state') @applyInProgress = false @applications.timeSpentFetching = 0 @@ -482,31 +492,38 @@ module.exports = class DeviceState extends EventEmitter @lastSuccessfulUpdate = Date.now() @reportCurrentState(update_failed: false, update_pending: false, update_downloaded: false) @emitAsync('apply-target-state-end', null) - return + return + if !intermediate @reportCurrentState(update_pending: true) - Promise.map steps, (step) => - @applyStepAsync(step, { force, initial }) + Promise.map steps, (step) => + @applyStep(step, { force, initial, intermediate, skipLock }, updateContext) .catch (err) => - @applyError(err, force, initial) + @applyError(err, force, { initial, intermediate }) - continueApplyTarget: ({ force = false, initial = false } = {}) => - if @applyContinueScheduled - return - @applyContinueScheduled = true - setTimeout( => - @applyContinueScheduled = false - @applyTarget({ force, initial }) - , 1000) - return + continueApplyTarget: ({ force = false, initial = false, intermediate = false } = {}, updateContext) => + Promise.try => + if !intermediate + @applyBlocker + .then => + if updateContext.applyContinueScheduled + return + updateContext.applyContinueScheduled = true + Promise.delay(1000) + .then => + updateContext.applyContinueScheduled = false + @applyTarget({ force, initial, intermediate }, updateContext) + + pauseNextApply: => + @applyBlocker = new Promise (resolve) => + @applyUnblocker = resolve + + resumeNextApply: => + @applyUnblocker?() - # TODO: Make this and applyTarget work purely with promises, no need to wait on events triggerApplyTarget: ({ force = false, delay = 0, initial = false } = {}) => if @applyInProgress if !@scheduledApply? @scheduledApply = { force, delay } - @once 'apply-target-state-end', => - @triggerApplyTarget(@scheduledApply) - @scheduledApply = null else # If a delay has been set it's because we need to hold off before applying again, # so we need to respect the maximum delay that has been passed @@ -514,8 +531,18 @@ module.exports = class DeviceState extends EventEmitter @scheduledApply.force or= force return @applyInProgress = true - setTimeout( => + Promise.delay(delay) + .then => @lastApplyStart = process.hrtime() @applyTarget({ force, initial }) - , delay) + .finally => + if @scheduledApply? + @triggerApplyTarget(@scheduledApply) + @scheduledApply = null return + + applyIntermediateTarget: (intermediateTarget, { force = false, skipLock = false } = {}) => + @intermediateTarget = _.cloneDeep(intermediateTarget) + @applyTarget({ intermediate: true, force, skipLock }) + .then => + @intermediateTarget = null