From b003f48d7ba226c2f91bed9993d9552490496122 Mon Sep 17 00:00:00 2001 From: Pablo Carranza Velez Date: Wed, 29 Nov 2017 13:32:57 -0800 Subject: [PATCH] Switch to using knex migrations to set up the database, and change the database format to use integers for ids instead of strings. Also includes various improvements and bugfixes to services and the migration from legacy /data to volumes. The switch ti migrations involves a dirty hack for webpack to properly resolve the paths to the migrations js files - it uses an expression that webpack can't resolve, so we hardcode it to a value and use the ContextReplacementPlugin to make that value resolve to the migrations folder. The downsides to this approach are: - a change in knex code would break this - the migration code is added twice to the supervisor image: once in the migrations folder (because knex needs to loop through the directory to find the files), and once inside app.js (because I can't make webpack treat them as external) Signed-off-by: Pablo Carranza Velez --- .dockerignore | 1 + Dockerfile | 2 +- hardcode-migrations.js | 7 + knexfile.js | 10 + package.json | 3 +- src/api-binder.coffee | 44 +- src/application-manager.coffee | 581 +++++++++--------- src/compose/images.coffee | 55 +- src/compose/networks.coffee | 24 +- src/compose/service-manager.coffee | 151 +++-- src/compose/service.coffee | 83 +-- src/compose/volumes.coffee | 61 +- src/config.coffee | 28 +- src/db.coffee | 206 +------ src/device-config.coffee | 66 +- src/device-state.coffee | 188 ++---- src/event-tracker.coffee | 6 +- src/lib/constants.coffee | 3 +- src/lib/docker-utils.coffee | 1 + src/lib/update-lock.coffee | 63 +- src/migrations/20171129013519_legacy.js | 136 ++++ .../20171129064057_multicontainer.js | 309 ++++++++++ src/proxyvisor.coffee | 161 ++--- src/supervisor-api.coffee | 3 +- src/supervisor.coffee | 23 +- webpack.config.js | 20 +- 26 files changed, 1221 insertions(+), 1014 deletions(-) create mode 100644 hardcode-migrations.js create mode 100644 knexfile.js create mode 100644 src/migrations/20171129013519_legacy.js create mode 100644 src/migrations/20171129064057_multicontainer.js diff --git a/.dockerignore b/.dockerignore index a17c3d5a..777dca89 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,3 +5,4 @@ node_modules tools dindctl README.md +knexfile.js diff --git a/Dockerfile b/Dockerfile index a67eaf58..2c946c23 100644 --- a/Dockerfile +++ b/Dockerfile @@ -138,7 +138,7 @@ COPY package.json /usr/src/app/ RUN JOBS=MAX npm install --no-optional --unsafe-perm -COPY webpack.config.js fix-jsonstream.js /usr/src/app/ +COPY webpack.config.js fix-jsonstream.js hardcode-migrations.js /usr/src/app/ COPY src /usr/src/app/src RUN npm run lint \ diff --git a/hardcode-migrations.js b/hardcode-migrations.js new file mode 100644 index 00000000..7708bf1b --- /dev/null +++ b/hardcode-migrations.js @@ -0,0 +1,7 @@ +// knex migrations use dynamic requires which break with webpack. +// This hack makes the migrations directory a constant so that at least we can use webpack contexts for the +// require. +module.exports = function (source) { + return source.toString().replace("require(directory + '/' + name);", "require('./migrations/' + name);") + .replace("require(_path2.default.join(this._absoluteConfigDir(), name));", "require('./migrations/' + name);") +} diff --git a/knexfile.js b/knexfile.js new file mode 100644 index 00000000..d845dbe9 --- /dev/null +++ b/knexfile.js @@ -0,0 +1,10 @@ +// Only used to be able to run "knex migrate:make " on the development machine. +// Not used in the supervisor. + +module.exports = { + client: 'sqlite3', + connection: { + filename: './database.sqlite' + }, + useNullAsDefault: true +} \ No newline at end of file diff --git a/package.json b/package.json index 5b5869da..ae9b952f 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "buffer-equal-constant-time": "^1.0.1", "coffee-loader": "^0.7.3", "coffee-script": "~1.11.0", + "copy-webpack-plugin": "^4.2.3", "docker-delta": "^2.0.4", "docker-progress": "^2.7.2", "docker-toolbelt": "^3.2.1", @@ -41,7 +42,6 @@ "memoizee": "^0.4.1", "mixpanel": "0.0.20", "mkdirp": "^0.5.1", - "ncp": "^2.0.0", "network-checker": "~0.0.5", "node-loader": "^0.6.0", "null-loader": "^0.1.1", @@ -52,7 +52,6 @@ "resin-register-device": "^3.0.0", "resin-sync": "^9.3.0", "resumable-request": "^2.0.0", - "rimraf": "^2.5.4", "rwlock": "^5.0.0", "semver": "^5.3.0", "semver-regex": "^1.0.0", diff --git a/src/api-binder.coffee b/src/api-binder.coffee index 119685c6..c017b9ee 100644 --- a/src/api-binder.coffee +++ b/src/api-binder.coffee @@ -26,9 +26,8 @@ class APIBinderRouter @router.use(bodyParser.json()) @router.post '/v1/update', (req, res) => @eventTracker.track('Update notification') - setImmediate => - if @apiBinder.readyForUpdates - @apiBinder.getAndSetTargetState(req.body.force) + if @apiBinder.readyForUpdates + @apiBinder.getAndSetTargetState(req.body.force) res.sendStatus(204) module.exports = class APIBinder @@ -65,7 +64,8 @@ module.exports = class APIBinder apiPrefix: baseUrlLegacy passthrough: requestOpts @cachedResinApi = @resinApi.clone({}, cache: {}) - return if !startServices + if !startServices + return console.log('Ensuring device is provisioned') @provisionDevice() .then => @@ -137,7 +137,8 @@ module.exports = class APIBinder _provision: => @config.get('provisioningOptions') .then (opts) => - return if opts.registered_at? and opts.deviceId? and !opts.provisioningApiKey? + if opts.registered_at? and opts.deviceId? and !opts.provisioningApiKey? + return Promise.try -> if opts.registered_at? and !opts.deviceId? console.log('Device is registered but no device id available, attempting key exchange') @@ -173,7 +174,8 @@ module.exports = class APIBinder @_provisionOrRetry(retryDelay) provisionDevice: => - throw new Error('Trying to provision device without initializing API client') if !@resinApi? + if !@resinApi? + throw new Error('Trying to provision device without initializing API client') @config.getMany([ 'provisioned' 'bootstrapRetryDelay' @@ -193,8 +195,10 @@ module.exports = class APIBinder 'deviceId' ]) .then (conf) => - throw new Error('Cannot provision dependent device in offline mode') if conf.offlineMode - throw new Error('Device must be provisioned to provision a dependent device') if !conf.provisioned + if conf.offlineMode + throw new Error('Cannot provision dependent device in offline mode') + if !conf.provisioned + throw new Error('Device must be provisioned to provision a dependent device') # TODO: when API supports it as per https://github.com/resin-io/hq/pull/949 remove userId _.defaults(device, { user: conf.userId @@ -219,8 +223,10 @@ module.exports = class APIBinder 'apiTimeout' ]) .then (conf) => - throw new Error('Cannot update dependent device in offline mode') if conf.offlineMode - throw new Error('Device must be provisioned to update a dependent device') if !conf.provisioned + if conf.offlineMode + throw new Error('Cannot update dependent device in offline mode') + if !conf.provisioned + throw new Error('Device must be provisioned to update a dependent device') @resinApiLegacy.patch resource: 'device' id: id @@ -229,7 +235,6 @@ module.exports = class APIBinder apikey: conf.currentApiKey .timeout(conf.apiTimeout) - # TODO: change to the multicontainer model, I think it's device_configuration_variable? # Creates the necessary config vars in the API to match the current device state, # without overwriting any variables that are already set. _reportInitialEnv: => @@ -301,10 +306,12 @@ module.exports = class APIBinder return startTargetStatePoll: -> - throw new Error('Trying to start poll without initializing API client') if !@resinApi? + if !@resinApi? + throw new Error('Trying to start poll without initializing API client') @_pollTargetState() @config.on 'change', (changedConfig) => - @_pollTargetState() if changedConfig.appUpdatePollInterval? + if changedConfig.appUpdatePollInterval? + @_pollTargetState() _getStateDiff: => diff = { @@ -325,7 +332,6 @@ module.exports = class APIBinder @cachedResinApi._request(requestParams) - # TODO: switch to using the proper endpoint by changing @_reportV1 to @_reportV2 _report: => @config.getMany([ 'currentApiKey', 'deviceId', 'apiTimeout', 'resinApiEndpoint', 'uuid' ]) .then (conf) => @@ -339,6 +345,7 @@ module.exports = class APIBinder _.assign(@lastReportedState.dependent, stateDiff.dependent) _reportCurrentState: => + @reportPending = true @deviceState.getStatus() .then (currentDeviceState) => _.assign(@stateForReport.local, currentDeviceState.local) @@ -350,19 +357,20 @@ module.exports = class APIBinder @_report() .delay(REPORT_SUCCESS_DELAY) .then => - setImmediate(@_reportCurrentState) + @_reportCurrentState() .catch (err) => @eventTracker.track('Device state report failure', { error: err }) Promise.delay(REPORT_RETRY_DELAY) .then => - setImmediate(@_reportCurrentState) + @_reportCurrentState() + return startCurrentStateReport: => - throw new Error('Trying to start state reporting without initializing API client') if !@resinApi? + if !@resinApi? + throw new Error('Trying to start state reporting without initializing API client') # patch to the device(id) endpoint @deviceState.on 'change', => if !@reportPending - @reportPending = true # A latency of 100 ms should be acceptable and # allows avoiding catching docker at weird states @_reportCurrentState() diff --git a/src/application-manager.coffee b/src/application-manager.coffee index 4b61219f..3bdbc70a 100644 --- a/src/application-manager.coffee +++ b/src/application-manager.coffee @@ -21,7 +21,8 @@ Proxyvisor = require './proxyvisor' serviceAction = (action, serviceId, current, target, options) -> obj = { action, serviceId, current, target } - obj.options = options if options? + if options? + obj.options = options return obj # TODO: move this to an Image class? @@ -31,8 +32,8 @@ imageForService = (service) -> appId: service.appId serviceId: service.serviceId serviceName: service.serviceName - imageId: service.imageId?.toString() - releaseId: service.releaseId?.toString() + imageId: service.imageId + releaseId: service.releaseId dependent: 0 } @@ -52,7 +53,7 @@ class ApplicationManagerRouter @router.use(bodyParser.json()) @router.post '/v1/restart', (req, res) => - appId = checkString(req.body.appId) + appId = checkInt(req.body.appId) force = checkTruthy(req.body.force) @eventTracker.track('Restart container (v1)', { appId }) if !appId? @@ -60,8 +61,10 @@ class ApplicationManagerRouter @applications.getCurrentApp(appId) .then (app) => service = app?.services?[0] - return res.status(400).send('App not found') if !service? - return res.status(400).send('v1 endpoints are only allowed on single-container apps') if app.services.length > 1 + if !service? + return res.status(400).send('App not found') + if app.services.length > 1 + return res.status(400).send('v1 endpoints are only allowed on single-container apps') @applications.executeStepAction(serviceAction('restart', service.serviceId, service, service), { force }) .then -> res.status(200).send('OK') @@ -69,15 +72,17 @@ class ApplicationManagerRouter res.status(503).send(err?.message or err or 'Unknown error') @router.post '/v1/apps/:appId/stop', (req, res) => - appId = checkString(req.params.appId) + appId = checkInt(req.params.appId) force = checkTruthy(req.body.force) if !appId? return res.status(400).send('Missing app id') @applications.getCurrentApp(appId) .then (app) => service = app?.services?[0] - return res.status(400).send('App not found') if !service? - return res.status(400).send('v1 endpoints are only allowed on single-container apps') if app.services.length > 1 + if !service? + return res.status(400).send('App not found') + if app.services.length > 1 + return res.status(400).send('v1 endpoints are only allowed on single-container apps') @applications.setTargetVolatileForService(service.serviceId, running: false) @applications.executeStepAction(serviceAction('stop', service.serviceId, service), { force }) .then (service) -> @@ -86,15 +91,17 @@ class ApplicationManagerRouter res.status(503).send(err?.message or err or 'Unknown error') @router.post '/v1/apps/:appId/start', (req, res) => - appId = checkString(req.params.appId) + appId = checkInt(req.params.appId) force = checkTruthy(req.body.force) if !appId? return res.status(400).send('Missing app id') @applications.getCurrentApp(appId) .then (app) => service = app?.services?[0] - return res.status(400).send('App not found') if !service? - return res.status(400).send('v1 endpoints are only allowed on single-container apps') if app.services.length > 1 + if !service? + return res.status(400).send('App not found') + if app.services.length > 1 + return res.status(400).send('v1 endpoints are only allowed on single-container apps') @applications.setTargetVolatileForService(service.serviceId, running: true) @applications.executeStepAction(serviceAction('start', service.serviceId, null, service), { force }) .then (service) -> @@ -103,15 +110,17 @@ class ApplicationManagerRouter res.status(503).send(err?.message or err or 'Unknown error') @router.get '/v1/apps/:appId', (req, res) => - appId = checkString(req.params.appId) + appId = checkInt(req.params.appId) @eventTracker.track('GET app (v1)', appId) if !appId? return res.status(400).send('Missing app id') @applications.getCurrentApp(appId) .then (app) -> service = app?.services?[0] - return res.status(400).send('App not found') if !service? - return res.status(400).send('v1 endpoints are only allowed on single-container apps') if app.services.length > 1 + if !service? + return res.status(400).send('App not found') + if app.services.length > 1 + return res.status(400).send('v1 endpoints are only allowed on single-container apps') # Don't return data that will be of no use to the user appToSend = { appId @@ -126,19 +135,21 @@ class ApplicationManagerRouter res.status(503).send(err?.message or err or 'Unknown error') @router.post '/v1/purge', (req, res) => - appId = checkString(req.body.appId) + appId = checkInt(req.body.appId) force = checkTruthy(req.body.force) if !appId? errMsg = "App not found: an app needs to be installed for purge to work. If you've recently moved this device from another app, please push an app and wait for it to be installed first." return res.status(400).send(errMsg) - Promise.using updateLock.lock(appId, { force }), => + @_lockingIfNecessary appId, { force }, => @applications.getCurrentApp(appId) .then (app) => service = app?.services?[0] - return res.status(400).send('App not found') if !service? - return res.status(400).send('v1 endpoints are only allowed on single-container apps') if app.services.length > 1 + if !service? + return res.status(400).send('App not found') + if app.services.length > 1 + return res.status(400).send('v1 endpoints are only allowed on single-container apps') @applications.executeStepAction(serviceAction('kill', service.serviceId, service, null, skipLock: true), { force }) .then => @applications.executeStepAction({ @@ -164,21 +175,75 @@ module.exports = class ApplicationManager extends EventEmitter @volumes = new Volumes({ @docker, @logger }) @proxyvisor = new Proxyvisor({ @config, @logger, @db, @docker, @images, applications: this }) @_targetVolatilePerServiceId = {} - @validActions = [ - 'kill' - 'start' - 'stop' - 'updateReleaseId' - 'fetch' - 'removeImage' - 'updateImage' - 'killAll' - 'purge' - 'restart' - 'cleanup' - 'createNetworkOrVolume' - 'removeNetworkOrVolume' - ].concat(@proxyvisor.validActions) + @actionExecutors = { + stop: (step, { force = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + @services.kill(step.current, { removeContainer: false }) + kill: (step, { force = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + @services.kill(step.current) + .then => + if step.options?.removeImage + @images.remove(imageForService(step.current)) + updateMetadata: (step) => + @services.updateMetadata(step.current, step.target) + purge: (step, { force = false } = {}) => + appId = step.appId + @logger.logSystemMessage("Purging data for app #{appId}", { appId }, 'Purge data') + @_lockingIfNecessary appId, { force, skipLock: step.options?.skipLock }, => + @getCurrentApp(appId) + .then (app) => + if !_.isEmpty(app?.services) + throw new Error('Attempt to purge app with running services') + if _.isEmpty(app?.volumes) + @logger.logSystemMessage('No volumes to purge', { appId }, 'Purge data noop') + return + Promise.mapSeries _.toPairs(app.volumes ? {}), ([ name, config ]) => + @volumes.remove({ name }) + .then => + @volumes.create({ name, config, appId }) + .then => + @logger.logSystemMessage('Purged data', { appId }, 'Purge data success') + .catch (err) => + @logger.logSystemMessage("Error purging data: #{err}", { appId, error: err }, 'Purge data error') + throw err + restart: (step, { force = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + Promise.try => + @services.kill(step.current) + .then => + @services.start(step.target) + stopAll: (step, { force = false } = {}) => + @stopAll({ force }) + start: (step) => + @services.start(step.target) + handover: (step, { force = false } = {}) => + @_lockingIfNecessary step.current.appId, { force, skipLock: step.options?.skipLock }, => + @services.handover(step.current, step.target) + fetch: (step) => + Promise.join( + @config.get('fetchOptions') + @images.getAvailable() + (opts, availableImages) => + opts.deltaSource = @bestDeltaSource(step.image, availableImages) + @images.fetch(step.image, opts) + ) + .finally => + @reportCurrentState(update_downloaded: true) + removeImage: (step) => + @images.remove(step.image) + updateImage: (step) => + @images.update(step.target) + cleanup: (step) => + @images.cleanup() + createNetworkOrVolume: (step) => + model = if step.model is 'volume' then @volumes else @networks + model.create(step.target) + removeNetworkOrVolume: (step) => + model = if step.model is 'volume' then @volumes else @networks + model.remove(step.current) + } + @validActions = _.keys(@actionExecutors).concat(@proxyvisor.validActions) @_router = new ApplicationManagerRouter(this) @router = @_router.router @@ -210,7 +275,7 @@ module.exports = class ApplicationManager extends EventEmitter commit = null # We iterate over the current running services and add them to the current state # of the app they belong to. - _.forEach services, (service) -> + for service in services appId = service.appId apps[appId] ?= {} apps[appId].services ?= {} @@ -225,22 +290,25 @@ module.exports = class ApplicationManager extends EventEmitter else # There's two containers with the same imageId, so this has to be a handover previousReleaseId = apps[appId].services[service.imageId].releaseId - apps[appId].services[service.imageId].releaseId = Math.max(parseInt(previousReleaseId), parseInt(service.releaseId)).toString() + apps[appId].services[service.imageId].releaseId = Math.max(previousReleaseId, service.releaseId) apps[appId].services[service.imageId].status = 'Handing over' - _.forEach images, (image) -> + for image in images appId = image.appId if !image.dependent apps[appId] ?= {} apps[appId].services ?= {} - apps[appId].services[image.imageId] ?= _.pick(image, [ 'status', 'download_progress', 'releaseId' ]) + apps[appId].services[image.imageId] ?= _.pick(image, [ 'status', 'releaseId' ]) + apps[appId].services[image.imageId].download_progress = image.downloadProgress else dependent[appId] ?= {} dependent[appId].images ?= {} - dependent[appId].images[image.imageId] = _.pick(image, [ 'status', 'download_progress' ]) + dependent[appId].images[image.imageId] = _.pick(image, [ 'status' ]) + dependent[appId].images[image.imageId].download_progress = image.downloadProgress obj = { local: apps, dependent } - obj.commit = commit if commit + if commit? + obj.commit = commit return obj ) @@ -252,18 +320,18 @@ module.exports = class ApplicationManager extends EventEmitter # We iterate over the current running services and add them to the current state # of the app they belong to. - _.forEach services, (service) -> + for service in services appId = service.appId apps[appId].services ?= [] apps[appId].services.push(service) - _.forEach networks, (network) -> + for network in networks appId = network.appId apps[appId] ?= { appId } apps[appId].networks ?= {} apps[appId].networks[network.name] = network.config - _.forEach volumes, (volume) -> + for volume in volumes appId = volume.appId apps[appId] ?= { appId } apps[appId].volumes ?= {} @@ -293,114 +361,108 @@ module.exports = class ApplicationManager extends EventEmitter getTargetApp: (appId) => @db.models('app').where({ appId }).select() .then ([ app ]) => - return if !app? + if !app? + return @normaliseAndExtendAppFromDB(app) # Compares current and target services and returns a list of service pairs to be updated/removed/installed. # The returned list is an array of objects where the "current" and "target" properties define the update pair, and either can be null # (in the case of an install or removal). compareServicesForUpdate: (currentServices, targetServices) -> - Promise.try -> - removePairs = [] - installPairs = [] - updatePairs = [] - targetServiceIds = _.map(targetServices, 'serviceId') - currentServiceIds = _.uniq(_.map(currentServices, 'serviceId')) + removePairs = [] + installPairs = [] + updatePairs = [] + targetServiceIds = _.map(targetServices, 'serviceId') + currentServiceIds = _.uniq(_.map(currentServices, 'serviceId')) - toBeRemoved = _.difference(currentServiceIds, targetServiceIds) - _.forEach toBeRemoved, (serviceId) -> - servicesToRemove = _.filter(currentServices, (s) -> s.serviceId == serviceId) - _.map servicesToRemove, (service) -> + toBeRemoved = _.difference(currentServiceIds, targetServiceIds) + for serviceId in toBeRemoved + servicesToRemove = _.filter(currentServices, (s) -> s.serviceId == serviceId) + for service in servicesToRemove + removePairs.push({ + current: service + target: null + serviceId + }) + + toBeInstalled = _.difference(targetServiceIds, currentServiceIds) + for serviceId in toBeInstalled + serviceToInstall = _.find(targetServices, (s) -> s.serviceId == serviceId) + if serviceToInstall? + installPairs.push({ + current: null + target: serviceToInstall + serviceId + }) + + toBeMaybeUpdated = _.intersection(targetServiceIds, currentServiceIds) + currentServicesPerId = {} + targetServicesPerId = _.keyBy(targetServices, 'serviceId') + for serviceId in toBeMaybeUpdated + currentServiceContainers = _.filter currentServices, (service) -> + return service.serviceId == serviceId + if currentServiceContainers.length > 1 + currentServicesPerId[serviceId] = _.maxBy(currentServiceContainers, 'createdAt') + # All but the latest container for this service are spurious and should be removed + for service in _.without(currentServiceContainers, currentServicesPerId[serviceId]) removePairs.push({ current: service target: null serviceId }) + else + currentServicesPerId[serviceId] = currentServiceContainers[0] - toBeInstalled = _.difference(targetServiceIds, currentServiceIds) - _.forEach toBeInstalled, (serviceId) -> - servicesToInstall = _.filter(targetServices, (s) -> s.serviceId == serviceId) - _.map servicesToInstall, (service) -> - installPairs.push({ - current: null - target: service - serviceId - }) - - toBeMaybeUpdated = _.intersection(targetServiceIds, currentServiceIds) - currentServicesPerId = {} - targetServicesPerId = {} - _.forEach toBeMaybeUpdated, (serviceId) -> - currentServiceContainers = _.filter currentServices, (service) -> - return service.serviceId == serviceId - targetServicesForId = _.filter targetServices, (service) -> - return service.serviceId == serviceId - throw new Error("Target state includes multiple services with serviceId #{serviceId}") if targetServicesForId.length > 1 - targetServicesPerId[serviceId] = targetServicesForId[0] - if currentServiceContainers.length > 1 - currentServicesPerId[serviceId] = _.maxBy(currentServiceContainers, 'createdAt') - # All but the latest container for this service are spurious and should be removed - _.forEach _.without(currentServiceContainers, currentServicesPerId[serviceId]), (service) -> - removePairs.push({ - current: service - target: null - serviceId - }) - else - currentServicesPerId[serviceId] = currentServiceContainers[0] - - Promise.filter toBeMaybeUpdated, (serviceId) -> - return !currentServicesPerId[serviceId].isEqual(targetServicesPerId[serviceId]) - .map (serviceId) -> - updatePairs.push({ - current: currentServicesPerId[serviceId] - target: targetServicesPerId[serviceId] - serviceId - }) - .then -> - return { removePairs, installPairs, updatePairs } + needUpdate = _.filter toBeMaybeUpdated, (serviceId) -> + return !currentServicesPerId[serviceId].isEqual(targetServicesPerId[serviceId]) + for serviceId in needUpdate + updatePairs.push({ + current: currentServicesPerId[serviceId] + target: targetServicesPerId[serviceId] + serviceId + }) + return { removePairs, installPairs, updatePairs } compareNetworksOrVolumesForUpdate: (model, { current, target }, appId) -> - Promise.try -> - outputPairs = [] - currentNames = _.keys(current) - targetNames = _.keys(target) - toBeRemoved = _.difference(currentNames, targetNames) - _.forEach toBeRemoved, (name) -> - outputPairs.push({ - current: { - name - appId - config: current[name] - } - target: null - }) - toBeInstalled = _.difference(targetNames, currentNames) - _.forEach toBeInstalled, (name) -> - outputPairs.push({ - current: null - target: { - name - appId - config: target[name] - } - }) - toBeUpdated = _.filter _.intersection(targetNames, currentNames), (name) -> - !model.isEqualConfig(current[name], target[name]) - _.forEach toBeUpdated, (name) -> - outputPairs.push({ - current: { - name - appId - config: current[name] - } - target: { - name - appId - config: target[name] - } - }) - return outputPairs + outputPairs = [] + currentNames = _.keys(current) + targetNames = _.keys(target) + toBeRemoved = _.difference(currentNames, targetNames) + for name in toBeRemoved + outputPairs.push({ + current: { + name + appId + config: current[name] + } + target: null + }) + toBeInstalled = _.difference(targetNames, currentNames) + for name in toBeInstalled + outputPairs.push({ + current: null + target: { + name + appId + config: target[name] + } + }) + toBeUpdated = _.filter _.intersection(targetNames, currentNames), (name) -> + !model.isEqualConfig(current[name], target[name]) + for name in toBeUpdated + outputPairs.push({ + current: { + name + appId + config: current[name] + } + target: { + name + appId + config: target[name] + } + }) + return outputPairs # TODO: should we consider the case where several services use the same image? # In such case we should do more complex matching to allow several image objects @@ -409,24 +471,26 @@ module.exports = class ApplicationManager extends EventEmitter compareImagesForMetadataUpdate: (availableImages, targetServices) -> pairs = [] targetImages = _.map(targetServices, imageForService) - _.forEach targetImages, (target) -> - imageWithSameName = _.find(availableImages, (img) -> img.name == target.name) - return if !imageWithSameName? - return if _.find(availableImages, (img) -> _.isEqual(_.omit(img, 'id'), target)) - pairs.push({ current: imageWithSameName, target, serviceId: target.serviceId }) + for target in targetImages + imageWithSameContent = _.find(availableImages, (img) => @images.isSameImage(img, target)) + if imageWithSameContent? and !_.find(availableImages, (img) -> _.isEqual(_.omit(img, 'id'), target)) + pairs.push({ current: imageWithSameContent, target, serviceId: target.serviceId }) return pairs # Checks if a service is using a network or volume that is about to be updated _hasCurrentNetworksOrVolumes: (service, networkPairs, volumePairs) -> - return false if !service? + if !service? + return false hasNetwork = _.some networkPairs, (pair) -> pair.current.name == service.network_mode - return true if hasNetwork + if hasNetwork + return true hasVolume = _.some service.volumes, (volume) -> name = _.split(volume, ':')[0] _.some volumePairs, (pair) -> pair.current.name == name - return true if hasVolume + if hasVolume + return true return false # TODO: account for volumes-from, networks-from, links, etc @@ -435,25 +499,28 @@ module.exports = class ApplicationManager extends EventEmitter # for depends_on, check no install or update pairs have that service dependencyUnmet = _.some target.depends_on ? [], (dependency) -> _.find(pendingPairs, (pair) -> pair.target?.serviceName == dependency)? or _.find(stepsInProgress, (step) -> step.target?.serviceName == dependency)? - return false if dependencyUnmet - # for networks and volumes, check no network pairs have that volume name - if _.find(networkPairs, (pair) -> pair.target.name == target.network_mode)? + if dependencyUnmet return false - if _.find(stepsInProgress, (step) -> step.model == 'network' and step.target.name == target.network_mode)? + # for networks and volumes, check no network pairs have that volume name + if _.find(networkPairs, (pair) -> pair.target?.name == target.network_mode)? + return false + if _.find(stepsInProgress, (step) -> step.model == 'network' and step.target?.name == target.network_mode)? return false volumeUnmet = _.some target.volumes, (volumeDefinition) -> - sourceName = volumeDefinition.split(':')[0] - _.find(volumePairs, (pair) -> pair.target.name == sourceName)? or _.find(stepsInProgress, (step) -> step.model == 'volume' and step.target.name == sourceName)? + [ sourceName, destName ] = volumeDefinition.split(':') + if !destName? # If this is not a named volume, ignore it + return false + _.find(volumePairs, (pair) -> pair.target?.name == sourceName)? or _.find(stepsInProgress, (step) -> step.model == 'volume' and step.target.name == sourceName)? return !volumeUnmet # Unless the update strategy requires an early kill (i.e. kill-then-download, delete-then-download), we only want # to kill a service once the images for the services it depends on have been downloaded, so as to minimize # downtime (but not block the killing too much, potentially causing a deadlock) - _dependenciesMetForServiceKill: (target, targetApp, availableImages) -> + _dependenciesMetForServiceKill: (target, targetApp, availableImages) => if target.depends_on? for dependency in target.depends_on dependencyService = _.find(targetApp.services, (s) -> s.serviceName == dependency) - if !_.find(availableImages, (image) -> image.name == dependencyService.image)? + if !_.find(availableImages, (image) => @images.isSameImage(image, { name: dependencyService.image }))? return false return true @@ -468,7 +535,7 @@ module.exports = class ApplicationManager extends EventEmitter # If the current update doesn't require killing the services that use this network/volume, # we have to kill them before removing the network/volume (e.g. when we're only updating the network config) steps = [] - _.forEach dependencies, (dependency) -> + for dependency in dependencies if !_.some(changingPairs, (pair) -> pair.serviceId == dependency.serviceId) steps.push(serviceAction('kill', dependency.serviceId, dependency)) return steps @@ -490,8 +557,8 @@ module.exports = class ApplicationManager extends EventEmitter # Infers steps that do not require creating a new container _updateContainerStep: (current, target) -> - if current.releaseId != target.releaseId - return serviceAction('updateReleaseId', target.serviceId, current, target) + if current.releaseId != target.releaseId or current.imageId != target.imageId + return serviceAction('updateMetadata', target.serviceId, current, target) else if target.running return serviceAction('start', target.serviceId, current, target) else @@ -529,13 +596,13 @@ module.exports = class ApplicationManager extends EventEmitter return null } - _nextStepForService: ({ current, target }, updateContext) -> + _nextStepForService: ({ current, target }, updateContext) => { targetApp, networkPairs, volumePairs, installPairs, updatePairs, stepsInProgress, availableImages } = updateContext if _.find(stepsInProgress, (step) -> step.serviceId == target.serviceId)? # There is already a step in progress for this service, so we wait return null - needsDownload = !_.some(availableImages, (image) -> target.image == image.name) + needsDownload = !_.some(availableImages, (image) => @images.isSameImage(image, { name: target.image })) dependenciesMetForStart = => @_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs), stepsInProgress) dependenciesMetForKill = => @@ -554,7 +621,8 @@ module.exports = class ApplicationManager extends EventEmitter else strategy = checkString(target.labels['io.resin.update.strategy']) validStrategies = [ 'download-then-kill', 'kill-then-download', 'delete-then-download', 'hand-over' ] - strategy = 'download-then-kill' if !_.includes(validStrategies, strategy) + if !_.includes(validStrategies, strategy) + strategy = 'download-then-kill' timeout = checkInt(target.labels['io.resin.update.handover_timeout']) return @_strategySteps[strategy](current, target, needsDownload, dependenciesMetForStart, dependenciesMetForKill, needsSpecialKill, timeout) @@ -562,45 +630,44 @@ module.exports = class ApplicationManager extends EventEmitter emptyApp = { services: [], volumes: {}, networks: {} } if !targetApp? targetApp = emptyApp + else + # Create the default network for the target app + targetApp.networks[targetApp.appId] ?= {} if !currentApp? currentApp = emptyApp appId = targetApp.appId ? currentApp.appId - # Create the default network for the target app - targetApp.networks[targetApp.appId] ?= {} - Promise.join( - @compareNetworksOrVolumesForUpdate(@networks, { current: currentApp.networks, target: targetApp.networks }, appId) - @compareNetworksOrVolumesForUpdate(@volumes, { current: currentApp.volumes, target: targetApp.volumes }, appId) - @compareServicesForUpdate(currentApp.services, targetApp.services) - @compareImagesForMetadataUpdate(availableImages, targetApp.services) - (networkPairs, volumePairs, { removePairs, installPairs, updatePairs }, imagePairs) => - steps = [] - # All removePairs get a 'kill' action - _.forEach removePairs, ({ current }) -> - steps.push(serviceAction('kill', current.serviceId, current, null)) - # next step for install pairs in download - start order, but start requires dependencies, networks and volumes met - # next step for update pairs in order by update strategy. start requires dependencies, networks and volumes met. - _.forEach installPairs.concat(updatePairs), (pair) => - step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, stepsInProgress, availableImages }) - steps.push(step) if step? - # next step for network pairs - remove requires services killed, create kill if no pairs or steps affect that service - _.forEach networkPairs, (pair) => - pairSteps = @_nextStepsForNetwork(pair, currentApp, removePairs.concat(updatePairs)) - steps = steps.concat(pairSteps) if !_.isEmpty(pairSteps) - # next step for volume pairs - remove requires services killed, create kill if no pairs or steps affect that service - _.forEach volumePairs, (pair) => - pairSteps = @_nextStepsForVolume(pair, currentApp, removePairs.concat(updatePairs)) - steps = steps.concat(pairSteps) if !_.isEmpty(pairSteps) - _.forEach imagePairs, (pair) -> - steps.push(_.assign({ action: 'updateImage' }, pair)) - return steps - ) + networkPairs = @compareNetworksOrVolumesForUpdate(@networks, { current: currentApp.networks, target: targetApp.networks }, appId) + volumePairs = @compareNetworksOrVolumesForUpdate(@volumes, { current: currentApp.volumes, target: targetApp.volumes }, appId) + { removePairs, installPairs, updatePairs } = @compareServicesForUpdate(currentApp.services, targetApp.services) + imagePairs = @compareImagesForMetadataUpdate(availableImages, targetApp.services) + steps = [] + # All removePairs get a 'kill' action + for pair in removePairs + steps.push(serviceAction('kill', pair.current.serviceId, pair.current, null)) + # next step for install pairs in download - start order, but start requires dependencies, networks and volumes met + # next step for update pairs in order by update strategy. start requires dependencies, networks and volumes met. + for pair in installPairs.concat(updatePairs) + step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, stepsInProgress, availableImages }) + if step? + steps.push(step) + # next step for network pairs - remove requires services killed, create kill if no pairs or steps affect that service + for pair in networkPairs + pairSteps = @_nextStepsForNetwork(pair, currentApp, removePairs.concat(updatePairs)) + steps = steps.concat(pairSteps) + # next step for volume pairs - remove requires services killed, create kill if no pairs or steps affect that service + for pair in volumePairs + pairSteps = @_nextStepsForVolume(pair, currentApp, removePairs.concat(updatePairs)) + steps = steps.concat(pairSteps) + for pair in imagePairs + steps.push(_.assign({ action: 'updateImage' }, pair)) + return steps normaliseAppForDB: (app) => services = _.map app.services, (s, serviceId) -> service = _.clone(s) service.appId = app.appId service.releaseId = app.releaseId - service.serviceId = serviceId + service.serviceId = checkInt(serviceId) service.commit = app.commit return service Promise.map services, (service) => @@ -632,7 +699,8 @@ module.exports = class ApplicationManager extends EventEmitter .catchReturn(@docker.InvalidNetGatewayError, null) (imageInfo, apiHostForNetwork) -> serviceOpts.imageInfo = imageInfo - serviceOpts.supervisorApiHost = apiHostForNetwork if apiHostForNetwork? + if apiHostForNetwork? + serviceOpts.supervisorApiHost = apiHostForNetwork return new Service(service, serviceOpts) ) @@ -655,9 +723,9 @@ module.exports = class ApplicationManager extends EventEmitter Promise.map(JSON.parse(app.services), (service) => @createTargetService(service, configOpts)) .then (services) -> # If a named volume is defined in a service, we add it app-wide so that we can track it and purge it - _.forEach services, (s) -> + for s in services serviceNamedVolumes = s.getNamedVolumes() - _.forEach serviceNamedVolumes, (name) -> + for name in serviceNamedVolumes volumes[name] ?= { labels: {} } outApp = { appId: app.appId @@ -674,17 +742,16 @@ module.exports = class ApplicationManager extends EventEmitter setTarget: (apps, dependent , trx) => setInTransaction = (trx) => Promise.try => - if apps? - appsArray = _.map apps, (app, appId) -> - appClone = _.clone(app) - appClone.appId = appId - return appClone - Promise.map(appsArray, @normaliseAppForDB) - .then (appsForDB) => - Promise.map appsForDB, (app) => - @db.upsertModel('app', app, { appId: app.appId }, trx) - .then -> - trx('app').whereNotIn('appId', _.map(appsForDB, 'appId')).del() + appsArray = _.map apps, (app, appId) -> + appClone = _.clone(app) + appClone.appId = checkInt(appId) + return appClone + Promise.map(appsArray, @normaliseAppForDB) + .then (appsForDB) => + Promise.map appsForDB, (app) => + @db.upsertModel('app', app, { appId: app.appId }, trx) + .then -> + trx('app').whereNotIn('appId', _.map(appsForDB, 'appId')).del() .then => @proxyvisor.setTargetInTransaction(dependent, trx) @@ -705,7 +772,8 @@ module.exports = class ApplicationManager extends EventEmitter .map (app) => if !_.isEmpty(app.services) app.services = _.map app.services, (service) => - _.merge(service, @_targetVolatilePerServiceId[service.serviceId]) if @_targetVolatilePerServiceId[service.serviceId]? + if @_targetVolatilePerServiceId[service.serviceId]? + _.merge(service, @_targetVolatilePerServiceId[service.serviceId]) return service return app @@ -733,21 +801,21 @@ module.exports = class ApplicationManager extends EventEmitter allImagesForApp = (app) -> _.map(app.services, imageForService) - currentImages = _.flatten(_.map(current.local?.apps, allImagesForApp)) - targetImages = _.flatten(_.map(target.local?.apps, allImagesForApp)) - availableAndUnused = _.filter available, (image) -> - !_.some currentImages.concat(targetImages), (imageInUse) -> image.name == imageInUse.name - imagesToDownload = _.filter targetImages, (imageName) -> - !_.some available, (availableImage) -> availableImage.name == imageName.name + currentImages = _.flatten(_.map(current.local.apps, allImagesForApp)) + targetImages = _.flatten(_.map(target.local.apps, allImagesForApp)) + availableAndUnused = _.filter available, (image) => + !_.some currentImages.concat(targetImages), (imageInUse) => @images.isSameImage(image, imageInUse) + imagesToDownload = _.filter targetImages, (targetImage) => + !_.some available, (availableImage) => @images.isSameImage(availableImage, targetImage) deltaSources = _.map imagesToDownload, (image) => return @bestDeltaSource(image, available) proxyvisorImages = @proxyvisor.imagesInUse(current, target) - return _.filter availableAndUnused, (image) -> + return _.filter availableAndUnused, (image) => notUsedForDelta = !_.some deltaSources, (deltaSource) -> deltaSource == image.name - notUsedByProxyvisor = !_.some proxyvisorImages, (proxyvisorImage) -> image.name == proxyvisorImage + notUsedByProxyvisor = !_.some proxyvisorImages, (proxyvisorImage) => @images.isSameImage(image, { name: proxyvisorImage }) return notUsedForDelta and notUsedByProxyvisor _inferNextSteps: (cleanupNeeded, availableImages, current, target, stepsInProgress) => @@ -759,17 +827,14 @@ module.exports = class ApplicationManager extends EventEmitter if cleanupNeeded nextSteps.push({ action: 'cleanup' }) imagesToRemove = @_unnecessaryImages(current, target, availableImages) - _.forEach imagesToRemove, (image) -> + for image in imagesToRemove nextSteps.push({ action: 'removeImage', image }) # If we have to remove any images, we do that before anything else - return nextSteps if !_.isEmpty(nextSteps) - allAppIds = _.union(_.keys(currentByAppId), _.keys(targetByAppId)) - Promise.map allAppIds, (appId) => - @_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], availableImages, stepsInProgress) - .then (nextStepsForThisApp) -> - nextSteps = nextSteps.concat(nextStepsForThisApp) - .then => - return @_removeDuplicateSteps(nextSteps, stepsInProgress) + if _.isEmpty(nextSteps) + allAppIds = _.union(_.keys(currentByAppId), _.keys(targetByAppId)) + for appId in allAppIds + nextSteps = nextSteps.concat(@_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], availableImages, stepsInProgress)) + return @_removeDuplicateSteps(nextSteps, stepsInProgress) _removeDuplicateSteps: (nextSteps, stepsInProgress) -> withoutProgressDups = _.filter nextSteps, (step) -> @@ -779,88 +844,24 @@ module.exports = class ApplicationManager extends EventEmitter stopAll: ({ force = false } = {}) => @services.getAll() .map (service) => - Promise.using @_lockIfNecessary(service.appId, { force }), => + @_lockingIfNecessary service.appId, { force }, => @services.kill(service, { removeContainer: false }) - _lockIfNecessary: (appId, { force = false, skipLock = false } = {}) => - return Promise.resolve() if skipLock + _lockingIfNecessary: (appId, { force = false, skipLock = false } = {}, fn) => + if skipLock + return Promise.resolve() @config.get('lockOverride') .then (lockOverride) -> return lockOverride or force .then (force) -> - updateLock.lock(appId, { force }) + updateLock.lock(appId, { force }, fn) executeStepAction: (step, { force = false } = {}) => if _.includes(@proxyvisor.validActions, step.action) return @proxyvisor.executeStepAction(step) if !_.includes(@validActions, step.action) return Promise.reject(new Error("Invalid action #{step.action}")) - actionExecutors = - stop: => - Promise.using @_lockIfNecessary(step.current.appId, { force, skipLock: step.options?.skipLock }), => - @services.kill(step.current, { removeContainer: false }) - kill: => - Promise.using @_lockIfNecessary(step.current.appId, { force, skipLock: step.options?.skipLock }), => - @services.kill(step.current) - .then => - @images.remove(imageForService(step.current)) if step.options?.removeImage - updateReleaseId: => - @services.updateReleaseId(step.current, step.target.releaseId) - purge: => - appId = step.appId - @logger.logSystemMessage("Purging data for app #{appId}", { appId }, 'Purge data') - Promise.using @_lockIfNecessary(appId, { force, skipLock: step.options?.skipLock }), => - @getCurrentApp(appId) - .then (app) => - throw new Error('Attempt to purge app with running services') if !_.isEmpty(app?.services) - if _.isEmpty(app?.volumes) - @logger.logSystemMessage('No volumes to purge', { appId }, 'Purge data noop') - return - Promise.mapSeries _.toPairs(app.volumes ? {}), ([ name, config ]) => - @volumes.remove({ name }) - .then => - @volumes.create({ name, config, appId }) - .then => - @logger.logSystemMessage('Purged data', { appId }, 'Purge data success') - .catch (err) => - @logger.logSystemMessage("Error purging data: #{err}", { appId, error: err }, 'Purge data error') - throw err - restart: => - Promise.using @_lockIfNecessary(step.current.appId, { force, skipLock: step.options?.skipLock }), => - Promise.try => - @services.kill(step.current) - .then => - @services.start(step.target) - stopAll: => - @stopAll({ force }) - start: => - @services.start(step.target) - handover: => - Promise.using @_lockIfNecessary(step.current.appId, { force, skipLock: step.options?.skipLock }), => - @services.handover(step.current, step.target) - fetch: => - Promise.join( - @config.get('fetchOptions') - @images.getAvailable() - (opts, availableImages) => - opts.deltaSource = @bestDeltaSource(step.image, availableImages) - @images.fetch(step.image, opts) - ) - .finally => - @reportCurrentState(update_downloaded: true) - removeImage: => - @images.remove(step.image) - updateImage: => - @images.update(step.target) - cleanup: => - @images.cleanup() - createNetworkOrVolume: => - model = if step.model is 'volume' then @volumes else @networks - model.create(step.target) - removeNetworkOrVolume: => - model = if step.model is 'volume' then @volumes else @networks - model.remove(step.current) - actionExecutors[step.action]() + @actionExecutors[step.action](step, { force }) getRequiredSteps: (currentState, targetState, stepsInProgress) => Promise.join( diff --git a/src/compose/images.coffee b/src/compose/images.coffee index 6c778641..1297629d 100644 --- a/src/compose/images.coffee +++ b/src/compose/images.coffee @@ -6,7 +6,7 @@ constants = require '../lib/constants' validation = require '../lib/validation' ImageNotFoundError = (err) -> - return "#{err.statusCode}" is '404' + return validation.checkInt(err.statusCode) is 404 # image = { # name: image registry/repo:tag @@ -17,7 +17,7 @@ ImageNotFoundError = (err) -> # releaseId # dependent # status Downloading, Downloaded, Deleting -# download_progress +# downloadProgress # } module.exports = class Images extends EventEmitter @@ -37,7 +37,7 @@ module.exports = class Images extends EventEmitter fetch: (image, opts) => onProgress = (progress) => - @reportChange(image.imageId, { download_progress: progress.percentage }) + @reportChange(image.imageId, { downloadProgress: progress.percentage }) @normalise(image.name) .then (imageName) => @@ -47,7 +47,7 @@ module.exports = class Images extends EventEmitter .then => @inspectByName(imageName) .catch => - @reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', download_progress: 0 })) + @reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 })) Promise.try => if validation.checkTruthy(opts.delta) @logger.logSystemEvent(logTypes.downloadImageDelta, { image }) @@ -81,19 +81,14 @@ module.exports = class Images extends EventEmitter @db.models('image').update(image).where(name: image.name) _removeImageIfNotNeeded: (image) => - removed = true @inspectByName(image.name) - .catch ImageNotFoundError, (err) -> - removed = false - return null .then (img) => - if img? - @db.models('image').where(name: image.name).select() - .then (imagesFromDB) => - if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(image)) - @docker.getImage(image.name).remove(force: true) - .then -> - return removed + @db.models('image').where(name: image.name).select() + .then (imagesFromDB) => + if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(image)) + @docker.getImage(image.name).remove(force: true) + .return(true) + .catchReturn(ImageNotFoundError, false) remove: (image) => @reportChange(image.imageId, _.merge(_.clone(image), { status: 'Deleting' })) @@ -127,12 +122,12 @@ module.exports = class Images extends EventEmitter _isAvailableInDocker: (image, dockerImages) -> _.some dockerImages, (dockerImage) -> - _.includes(dockerImage.NormalisedRepoTags, image.name) + _.includes(dockerImage.NormalisedRepoTags, image.name) or _.includes(dockerImage.RepoDigests, image.name) # Gets all images that are supervised, in an object containing name, appId, serviceId, serviceName, imageId, dependent. getAvailable: => @_withImagesFromDockerAndDB (dockerImages, supervisedImages) => - return _.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages)) + _.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages)) cleanupDatabase: => @_withImagesFromDockerAndDB (dockerImages, supervisedImages) => @@ -145,15 +140,15 @@ module.exports = class Images extends EventEmitter @getAvailable() .map (image) -> image.status = 'Downloaded' - image.download_progress = null + image.downloadProgress = null return image .then (images) => status = _.clone(@volatileState) - _.forEach images, (image) -> + for image in images status[image.imageId] ?= image return _.values(status) - _getDanglingAndOldSupervisorsForCleanup: => + _getOldSupervisorsForCleanup: => images = [] @docker.getRegistryAndName(constants.supervisorImage) .then (supervisorImageInfo) => @@ -164,13 +159,9 @@ module.exports = class Images extends EventEmitter .then ({ imageName, tagName }) -> if imageName == supervisorImageInfo.imageName and tagName != supervisorImageInfo.tagName images.push(repoTag) - .then => - @docker.listImages(filters: { dangling: [ 'true' ] }) - .map (image) -> - images.push(image.Id) .then => return _.filter images, (image) => - !@imageCleanupFailures[image]? or Date.now() - @imageCleanupFailures[image] > 3600 * 1000 + !@imageCleanupFailures[image]? or Date.now() - @imageCleanupFailures[image] > constants.imageCleanupErrorIgnoreTimeout inspectByName: (imageName) => @docker.getImage(imageName).inspect() @@ -179,17 +170,25 @@ module.exports = class Images extends EventEmitter @docker.normaliseImageName(imageName) isCleanupNeeded: => - @_getDanglingAndOldSupervisorsForCleanup() + @_getOldSupervisorsForCleanup() .then (imagesForCleanup) -> return !_.isEmpty(imagesForCleanup) - # Delete old supervisor images and dangling images + # Delete old supervisor images cleanup: => - @_getDanglingAndOldSupervisorsForCleanup() + @_getOldSupervisorsForCleanup() .map (image) => + console.log("Cleaning up #{image}") @docker.getImage(image).remove(force: true) .then => delete @imageCleanupFailures[image] .catch (err) => @logger.logSystemMessage("Error cleaning up #{image}: #{err.message} - will ignore for 1 hour", { error: err }, 'Image cleanup error') @imageCleanupFailures[image] = Date.now() + + @isSameImage: (image1, image2) -> + hash1 = image1.name.split('@')[1] + hash2 = image2.name.split('@')[1] + return image1.name == image2.name or (hash1? and hash1 == hash2) + + isSameImage: @isSameImage diff --git a/src/compose/networks.coffee b/src/compose/networks.coffee index d7478432..d5f6d1ec 100644 --- a/src/compose/networks.coffee +++ b/src/compose/networks.coffee @@ -1,35 +1,30 @@ -Promise = require 'bluebird' -_ = require 'lodash' - logTypes = require '../lib/log-types' +{ checkInt } = require '../lib/validation' module.exports = class Networks constructor: ({ @docker, @logger }) -> + # TODO: parse supported config fields format: (network) -> return { - appId: network.Labels['io.resin.appId'] + appId: checkInt(network.Labels['io.resin.appId']) name: network.Name config: {} } getAll: => @docker.listNetworks(filters: label: [ 'io.resin.supervised' ]) - .then (networks) => - Promise.map networks, (network) => - @docker.getNetwork(network.Name).inspect() - .then (net) => - @format(net) + .map (network) => + @docker.getNetwork(network.Name).inspect() + .then(@format) getAllByAppId: (appId) => @getAll() - .then (networks) -> - _.filter(networks, (v) -> v.appId == appId) + .filter((network) -> network.appId == appId) get: (name) => @docker.getNetwork(name).inspect() - .then (network) -> - return @format(network) + .then(@format) # TODO: what config values are relevant/whitelisted? create: ({ name, config, appId }) => @@ -38,7 +33,7 @@ module.exports = class Networks Name: name Labels: { 'io.resin.supervised': 'true' - 'io.resin.appId': appId + 'io.resin.appId': appId.toString() } }) .catch (err) => @@ -52,5 +47,6 @@ module.exports = class Networks @logger.logSystemEvent(logTypes.removeNetworkError, { network: { name }, error: err }) throw err + # TODO: compare supported config fields isEqualConfig: (current, target) -> return true diff --git a/src/compose/service-manager.coffee b/src/compose/service-manager.coffee index 762b097f..04a2844a 100644 --- a/src/compose/service-manager.coffee +++ b/src/compose/service-manager.coffee @@ -1,7 +1,7 @@ Promise = require 'bluebird' _ = require 'lodash' EventEmitter = require 'events' -JSONParser = require 'jsonparse' +JSONStream = require 'JSONStream' fs = Promise.promisifyAll(require('fs')) logTypes = require '../lib/log-types' @@ -9,8 +9,9 @@ logTypes = require '../lib/log-types' constants = require '../lib/constants' Service = require './service' -#restartVars = (conf) -> -# return _.pick(conf, [ 'RESIN_DEVICE_RESTART', 'RESIN_RESTART' ]) + +NotFoundError = (err) -> + return checkInt(err.statusCode) is 404 module.exports = class ServiceManager extends EventEmitter constructor: ({ @docker, @logger, @config }) -> @@ -24,12 +25,9 @@ module.exports = class ServiceManager extends EventEmitter # so we need to stop and remove them @docker.getImage(constants.supervisorImage).inspect() .then (supervisorImage) => - Promise.map(@docker.listContainers(all: true), (container) => - @docker.getImage(container.Image).inspect() - .then (containerImage) => - if containerImage.Id != supervisorImage.Id - @_killContainer(container.Id, { serviceName: 'legacy', image: container.Image }, { removeContainer: true }) - ) + Promise.map @docker.listContainers(all: true), (container) => + if container.ImageID != supervisorImage.Id + @_killContainer(container.Id, { serviceName: 'legacy', image: container.ImageID }, { removeContainer: true }) reportChange: (containerId, status) -> if status? @@ -45,35 +43,38 @@ module.exports = class ServiceManager extends EventEmitter _killContainer: (containerId, service = {}, { removeContainer = true }) => @logger.logSystemEvent(logTypes.stopService, { service }) - @reportNewStatus(containerId, service, 'Stopping') if service.imageId? + if service.imageId? + @reportNewStatus(containerId, service, 'Stopping') containerObj = @docker.getContainer(containerId) containerObj.stop(t: 10) .then -> - containerObj.remove(v: true) if removeContainer + if removeContainer + containerObj.remove(v: true) .catch (err) => - # Get the statusCode from the original cause and make sure statusCode its definitely a string for comparison + # Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison # reasons. - statusCode = '' + err.statusCode + statusCode = checkInt(err.statusCode) # 304 means the container was already stopped - so we can just remove it - if statusCode is '304' + if statusCode is 304 @logger.logSystemEvent(logTypes.stopServiceNoop, { service }) if removeContainer return containerObj.remove(v: true) return # 404 means the container doesn't exist, precisely what we want! :D - if statusCode is '404' + if statusCode is 404 @logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service }) return throw err .tap => - delete @containerHasDied[containerId] if @containerHasDied[containerId]? + delete @containerHasDied[containerId] .tap => @logger.logSystemEvent(logTypes.stopServiceSuccess, { service }) .catch (err) => @logger.logSystemEvent(logTypes.stopServiceError, { service, error: err }) throw err .finally => - @reportChange(containerId) if service.imageId? + if service.imageId? + @reportChange(containerId) kill: (service, { removeContainer = true } = {}) => @_killContainer(service.containerId, service, { removeContainer }) @@ -83,8 +84,6 @@ module.exports = class ServiceManager extends EventEmitter getAllByAppId: (appId) => @getAll("io.resin.app_id=#{appId}") - .filter (service) -> - service.appId == appId stopAllByAppId: (appId) => Promise.map @getAllByAppId(appId), (service) => @@ -93,8 +92,9 @@ module.exports = class ServiceManager extends EventEmitter create: (service) => mockContainerId = @config.newUniqueKey() @get(service) - .then ([ existingService ]) => - return @docker.getContainer(existingService.containerId) if existingService? + .then (existingService) => + if existingService? + return @docker.getContainer(existingService.containerId) conf = service.toContainerConfig() @logger.logSystemEvent(logTypes.installService, { service }) @reportNewStatus(mockContainerId, service, 'Installing') @@ -118,13 +118,13 @@ module.exports = class ServiceManager extends EventEmitter @reportNewStatus(containerId, service, 'Starting') container.start() .catch (err) => - statusCode = '' + err.statusCode + statusCode = checkInt(err.statusCode) # 304 means the container was already started, precisely what we want :) - if statusCode is '304' + if statusCode is 304 alreadyStarted = true return - if statusCode is '500' and err.message?.trim?()?.match(/exec format error$/) + if statusCode is 500 and err.message?.trim?()?.match(/exec format error$/) # Provide a friendlier error message for "exec format error" @config.get('deviceType') .then (deviceType) -> @@ -154,23 +154,21 @@ module.exports = class ServiceManager extends EventEmitter getAll: (extraLabelFilters = []) => filters = label: [ 'io.resin.supervised' ].concat(extraLabelFilters) @docker.listContainers({ all: true, filters }) - .then (containers) => - Promise.mapSeries containers, (container) => - @docker.getContainer(container.Id).inspect() - .then(Service.fromContainer) + .mapSeries (container) => + @docker.getContainer(container.Id).inspect() + .then(Service.fromContainer) # Returns an array with the container(s) matching a service by appId, commit, image and environment get: (service) => @getAll("io.resin.service_id=#{service.serviceId}") - .then (services = []) -> - return _.filter services, (currentService) -> - currentService.isSameContainer(service) + .filter((currentService) -> currentService.isSameContainer(service)) + .get(0) getStatus: => @getAll() .then (services) => status = _.clone(@volatileState) - _.forEach services, (service) -> + for service in services status[service.containerId] ?= _.pick(service, [ 'appId', 'imageId', 'status', 'releaseId', 'commit' ]) return _.values(status) @@ -180,37 +178,35 @@ module.exports = class ServiceManager extends EventEmitter if !container.Config.Labels['io.resin.supervised']? return null return Service.fromContainer(container) - .catchReturn(null) waitToKill: (service, timeout) -> - startTime = Date.now() pollInterval = 100 timeout = checkInt(timeout, positive: true) ? 60000 - checkFileOrTimeout = -> - killme = service.killmeFullPathOnHost() - fs.statAsync(killme) - .catch (err) -> - throw err unless (Date.now() - startTime) > timeout + deadline = Date.now() + timeout + + killmePath = service.killmeFullPathOnHost() + + wait = -> + fs.statAsync(killmePath) .then -> - fs.unlinkAsync(killme).catch(_.noop) - retryCheck = -> - checkFileOrTimeout() - .catch -> - Promise.delay(pollInterval).then(retryCheck) - retryCheck() + fs.unlinkAsync(killmePath).catch(_.noop) + .catch (err) -> + if Date.now() < deadline + Promise.delay(pollInterval).then(wait) + wait() prepareForHandover: (service) => @get(service) - .then ([ svc ]) => + .then (svc) => container = @docker.getContainer(svc.containerId) container.update(RestartPolicy: {}) .then -> - container.rename(name: "old_#{service.serviceName}_#{service.releaseId}") + container.rename(name: "old_#{service.serviceName}_#{service.imageId}_#{service.releaseId}") - updateReleaseId: (service, releaseId) => + updateMetadata: (service, { imageId, releaseId }) => @get(service) - .then ([ svc ]) => - @docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{releaseId}") + .then (svc) => + @docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{imageId}_#{releaseId}") handover: (currentService, targetService) => # We set the running container to not restart so that in case of a poweroff @@ -224,46 +220,43 @@ module.exports = class ServiceManager extends EventEmitter @kill(currentService) listenToEvents: => - return if @listening + if @listening + return @listening = true @docker.getEvents(filters: type: [ 'container' ]) .then (stream) => - parser = new JSONParser() - parser.onError = (err) -> - console.error('Error on docker events stream', err, err.stack) - stream.destroy() - parser.onValue = (data) => - return if parser.stack.length != 0 + stream.on 'error', (err) -> + console.error('Error on docker events stream:', err, err.stack) + parser = JSONStream.parse() + parser.on 'data', (data) => if data?.status in ['die', 'start'] - setImmediate => - @getByDockerContainerId(data.id) - .then (service) => - if service? - if data.status == 'die' - @logger.logSystemEvent(logTypes.serviceExit, { service }) - @containerHasDied[data.id] = true - else if data.status == 'start' and @containerHasDied[data.id] - @logger.logSystemEvent(logTypes.serviceRestart, { service }) - @logger.attach(@docker, data.id, service.serviceId) - .catch (err) -> - console.error('Error on docker event:', err, err.stack) + @getByDockerContainerId(data.id) + .catchReturn(NotFoundError, null) + .then (service) => + if service? + if data.status == 'die' + @logger.logSystemEvent(logTypes.serviceExit, { service }) + @containerHasDied[data.id] = true + else if data.status == 'start' and @containerHasDied[data.id] + delete @containerHasDied[data.id] + @logger.logSystemEvent(logTypes.serviceRestart, { service }) + @logger.attach(@docker, data.id, service.serviceId) + .catch (err) -> + console.error('Error on docker event:', err, err.stack) + return new Promise (resolve, reject) -> - stream + parser .on 'error', (err) -> console.error('Error on docker events stream', err) reject() - .on 'data', (data) -> - if typeof data is 'string' - data = new Buffer(data) - parser.write(data) .on 'end', -> console.error('Docker events stream ended, restarting listener') resolve() - .finally => - @listening = false - setImmediate( => @listenToEvents()) - .catch (err) => - console.error('Error starting to listen to events:', err, err.stack) + stream.pipe(parser) + .catch (err) -> + console.error('Error listening to events:', err, err.stack) + .finally => + @listening = false setTimeout => @listenToEvents() , 1000 diff --git a/src/compose/service.coffee b/src/compose/service.coffee index da3f771c..c8529869 100644 --- a/src/compose/service.coffee +++ b/src/compose/service.coffee @@ -5,6 +5,8 @@ updateLock = require '../lib/update-lock' constants = require '../lib/constants' conversions = require '../lib/conversions' +Images = require './images' + validRestartPolicies = [ 'no', 'always', 'on-failure', 'unless-stopped' ] # Construct a restart policy based on its name. @@ -41,13 +43,14 @@ formatDevices = (devices) -> CgroupPermissions ?= 'rwm' return { PathOnHost, PathInContainer, CgroupPermissions } +# TODO: Support "networks" too, instead of only networkMode module.exports = class Service constructor: (serviceProperties, opts = {}) -> { @image @expose @ports - @network_mode + @networkMode @privileged @releaseId @imageId @@ -63,15 +66,15 @@ module.exports = class Service @labels @volumes @restartPolicy - @depends_on - @cap_add - @cap_drop + @dependsOn + @capAdd + @capDrop @commit @status @devices @exposedPorts @portBindings - } = serviceProperties + } = _.mapKeys(serviceProperties, (v, k) -> _.camelCase(k)) @privileged ?= false @volumes ?= [] @labels ?= {} @@ -79,14 +82,12 @@ module.exports = class Service @running ?= true @ports ?= [] @expose ?= [] - @cap_add ?= [] - @cap_drop ?= [] + @capAdd ?= [] + @capDrop ?= [] @devices ?= [] @exposedPorts ?= {} @portBindings ?= {} - @network_mode ?= @appId.toString() - if @releaseId? - @releaseId = @releaseId.toString() + @networkMode ?= @appId.toString() # If the service has no containerId, it is a target service and has to be normalised and extended if !@containerId? @@ -142,7 +143,6 @@ module.exports = class Service @labels['io.resin.app_id'] = @appId.toString() @labels['io.resin.service_id'] = @serviceId.toString() @labels['io.resin.service_name'] = @serviceName - @labels['io.resin.image_id'] = @imageId.toString() @labels['io.resin.commit'] = @commit return @labels @@ -159,7 +159,7 @@ module.exports = class Service extendAndSanitiseVolumes: (imageInfo) => volumes = [] - _.forEach @volumes, (vol) -> + for vol in @volumes isBind = /:/.test(vol) if isBind bindSource = vol.split(':')[0] @@ -177,8 +177,8 @@ module.exports = class Service getNamedVolumes: => defaults = @defaultBinds() validVolumes = _.map @volumes, (vol) -> - return null if _.includes(defaults, vol) - return null if !/:/.test(vol) + if _.includes(defaults, vol) or !/:/.test(vol) + return null bindSource = vol.split(':')[0] if !path.isAbsolute(bindSource) return bindSource @@ -223,18 +223,20 @@ module.exports = class Service if containerPort? and !_.includes(boundContainerPorts, containerPort) expose.push(containerPort) - appId = container.Config.Labels['io.resin.app_id'] - serviceId = container.Config.Labels['io.resin.service_id'] + appId = checkInt(container.Config.Labels['io.resin.app_id']) + serviceId = checkInt(container.Config.Labels['io.resin.service_id']) serviceName = container.Config.Labels['io.resin.service_name'] - releaseId = container.Name.match(/.*_(\d+)$/)?[1] + nameComponents = container.Name.match(/.*_(\d+)_(\d+)$/) + imageId = checkInt(nameComponents?[1]) + releaseId = checkInt(nameComponents?[2]) service = { appId: appId serviceId: serviceId serviceName: serviceName - imageId: checkInt(container.Config.Labels['io.resin.image_id']) + imageId: imageId command: container.Config.Cmd entrypoint: container.Config.Entrypoint - network_mode: container.HostConfig.NetworkMode + networkMode: container.HostConfig.NetworkMode volumes: _.concat(container.HostConfig.Binds ? [], _.keys(container.Config.Volumes ? {})) image: container.Config.Image environment: conversions.envArrayToObject(container.Config.Env) @@ -248,13 +250,16 @@ module.exports = class Service ports: ports expose: expose containerId: container.Id - cap_add: container.HostConfig.CapAdd - cap_drop: container.HostConfig.CapDrop + capAdd: container.HostConfig.CapAdd + capDrop: container.HostConfig.CapDrop devices: container.HostConfig.Devices status exposedPorts: container.Config.ExposedPorts portBindings: container.HostConfig.PortBindings } + # I've seen docker use either 'no' or '' for no restart policy, so we normalise to 'no'. + if service.restartPolicy.Name == '' + service.restartPolicy.Name = 'no' return new Service(service) # TODO: map ports for any of the possible formats "container:host/protocol", port ranges, etc. @@ -262,20 +267,20 @@ module.exports = class Service exposedPorts = {} portBindings = {} if @ports? - _.forEach @ports, (port) -> + for port in @ports [ hostPort, containerPort ] = port.toString().split(':') containerPort ?= hostPort exposedPorts[containerPort + '/tcp'] = {} portBindings[containerPort + '/tcp'] = [ { HostIp: '', HostPort: hostPort } ] if @expose? - _.forEach @expose, (port) -> + for port in @expose exposedPorts[port + '/tcp'] = {} return { exposedPorts, portBindings } getBindsAndVolumes: => binds = [] volumes = {} - _.forEach @volumes, (vol) -> + for vol in @volumes isBind = /:/.test(vol) if isBind binds.push(vol) @@ -285,9 +290,8 @@ module.exports = class Service toContainerConfig: => { binds, volumes } = @getBindsAndVolumes() - conf = { - name: "#{@serviceName}_#{@releaseId}" + name: "#{@serviceName}_#{@imageId}_#{@releaseId}" Image: @image Cmd: @command Entrypoint: @entrypoint @@ -298,16 +302,17 @@ module.exports = class Service Labels: @labels HostConfig: Privileged: @privileged - NetworkMode: @network_mode + NetworkMode: @networkMode PortBindings: @portBindings Binds: binds - RestartPolicy: @restartPolicy - CapAdd: @cap_add - CapDrop: @cap_drop + CapAdd: @capAdd + CapDrop: @capDrop Devices: @devices } + if @restartPolicy.Name != 'no' + conf.HostConfig.RestartPolicy = @restartPolicy # If network mode is the default network for this app, add alias for serviceName - if @network_mode == @appId.toString() + if @networkMode == @appId.toString() conf.NetworkingConfig = { EndpointsConfig: { "#{@appId}": { @@ -319,9 +324,7 @@ module.exports = class Service isSameContainer: (otherService) => propertiesToCompare = [ - 'image' - 'imageId' - 'network_mode' + 'networkMode' 'privileged' 'restartPolicy' 'labels' @@ -332,12 +335,16 @@ module.exports = class Service arraysToCompare = [ 'volumes' 'devices' - 'cap_add' - 'cap_drop' + 'capAdd' + 'capDrop' ] - return _.isEqual(_.pick(this, propertiesToCompare), _.pick(otherService, propertiesToCompare)) and + return Images.isSameImage({ name: @image }, { name: otherService.image }) and + _.isEqual(_.pick(this, propertiesToCompare), _.pick(otherService, propertiesToCompare)) and _.every arraysToCompare, (property) => _.isEmpty(_.xorWith(this[property], otherService[property], _.isEqual)) isEqual: (otherService) => - return @isSameContainer(otherService) and @running == otherService.running and @releaseId == otherService.releaseId + return @isSameContainer(otherService) and + @running == otherService.running and + @releaseId == otherService.releaseId and + @imageId == otherService.imageId diff --git a/src/compose/volumes.coffee b/src/compose/volumes.coffee index d457779a..6a738ece 100644 --- a/src/compose/volumes.coffee +++ b/src/compose/volumes.coffee @@ -1,28 +1,23 @@ Promise = require 'bluebird' _ = require 'lodash' fs = Promise.promisifyAll(require('fs')) -ncp = Promise.promisify(require('ncp').ncp) -rimraf = Promise.promisify(require('rimraf')) -exec = Promise.promisify(require('child_process').exec) -ncp.limit = 16 +path = require 'path' logTypes = require '../lib/log-types' -migration = require '../lib/migration' constants = require '../lib/constants' - -ENOENT = (err) -> err.code is 'ENOENT' +{ checkInt } = require '../lib/validation' module.exports = class Volumes constructor: ({ @docker, @logger }) -> format: (volume) -> - appId = volume.Labels['io.resin.app_id'] + appId = checkInt(volume.Labels['io.resin.app_id']) return { name: volume.Name appId config: { labels: _.omit(volume.Labels, _.keys(@defaultLabels(appId))) - driver_opts: volume.Options + driverOpts: volume.Options } } @@ -48,15 +43,17 @@ module.exports = class Volumes defaultLabels: (appId) -> return { 'io.resin.supervised': 'true' - 'io.resin.app_id': appId + 'io.resin.app_id': appId.toString() } # TODO: what config values are relevant/whitelisted? + # For now we only care about driverOpts and labels create: ({ name, config = {}, appId }) => + config = _.mapKeys(config, (v, k) -> _.camelCase(k)) @logger.logSystemEvent(logTypes.createVolume, { volume: { name } }) labels = _.clone(config.labels) ? {} _.assign(labels, @defaultLabels(appId)) - driverOpts = config.driver_opts ? {} + driverOpts = config.driverOpts ? {} @docker.createVolume({ Name: name Labels: labels @@ -67,32 +64,22 @@ module.exports = class Volumes throw err createFromLegacy: (appId) => - name = migration.defaultLegacyVolume(appId) + name = "resin-data-#{appId}" @create({ name, appId }) .then (v) -> v.inspect() .then (v) -> - volumePath = "#{constants.rootMountPoint}#{v.Mountpoint}" - legacyPath = "/mnt/root/resin-data/#{appId}" - fs.lstatAsync(legacyPath) - .catch ENOENT, (err) -> - fs.lstatAsync(legacyPath + '-old') + volumePath = path.join(constants.rootMountPoint, v.Mountpoint) + legacyPath = path.join(constants.rootMountPoint, constants.dataPath, appId.toString()) + fs.renameAsync(legacyPath, volumePath) + .then -> + fs.openAsync(path.dirname(volumePath)) + .then (parent) -> + fs.fsyncAsync(parent) .then -> - rimraf(legacyPath + '-old') - .finally -> - throw err - .then -> - ncp(legacyPath, volumePath) - .then -> - exec('sync') - .then -> - # Before deleting, we rename so that if there's an unexpected poweroff - # next time we won't copy a partially deleted folder into the volume - fs.renameAsync(legacyPath, legacyPath + '-old') - .then -> - rimraf(legacyPath + '-old') + fs.closeAsync(parent) .catch (err) -> - console.log("Ignoring legacy data volume migration due to #{err}") + @logger.logSystemMessage("Warning: could not migrate legacy /data volume: #{err.message}", { error: err }, 'Volume migration error') remove: ({ name }) -> @logger.logSystemEvent(logTypes.removeVolume, { volume: { name } }) @@ -100,9 +87,11 @@ module.exports = class Volumes .catch (err) => @logger.logSystemEvent(logTypes.removeVolumeError, { volume: { name }, error: err }) - isEqualConfig: (current, target) -> - currentOpts = current?.driver_opts ? {} - targetOpts = target?.driver_opts ? {} - currentLabels = current?.labels ? {} - targetLabels = target?.labels ? {} + isEqualConfig: (current = {}, target = {}) -> + current = _.mapKeys(current, (v, k) -> _.camelCase(k)) + target = _.mapKeys(target, (v, k) -> _.camelCase(k)) + currentOpts = current.driverOpts ? {} + targetOpts = target.driverOpts ? {} + currentLabels = current.labels ? {} + targetLabels = target.labels ? {} return _.isEqual(currentLabels, targetLabels) and _.isEqual(currentOpts, targetOpts) diff --git a/src/config.coffee b/src/config.coffee index c13ad729..f4e8ebe8 100644 --- a/src/config.coffee +++ b/src/config.coffee @@ -145,6 +145,7 @@ module.exports = class Config extends EventEmitter deltaRetryCount: { source: 'db', mutable: true, default: '' } deltaRetryInterval: { source: 'db', mutable: true, default: '' } lockOverride: { source: 'db', mutable: true, default: 'false' } + legacyAppsPresent: { source: 'db', mutable: true, default: 'false' } } @configJsonCache = {} @@ -183,10 +184,12 @@ module.exports = class Config extends EventEmitter value = keyVals[key] if @configJsonCache[key] != value @configJsonCache[key] = value - delete @configJsonCache[key] if !value? and @schema[key].removeIfNull + if !value? and @schema[key].removeIfNull + delete @configJsonCache[key] changed = true .then => - @writeConfigJson() if changed + if changed + @writeConfigJson() configJsonRemove: (key) => changed = false @@ -196,19 +199,23 @@ module.exports = class Config extends EventEmitter delete @configJsonCache[key] changed = true .then => - @writeConfigJson() if changed + if changed + @writeConfigJson() configJsonPathOnHost: => Promise.try => - return @configPath if @configPath? - return @constants.configJsonPathOnHost if @constants.configJsonPathOnHost? + if @configPath? + return @configPath + if @constants.configJsonPathOnHost? + return @constants.configJsonPathOnHost osRelease.getOSVersion(@constants.hostOSVersionPath) .then (osVersion) => if /^Resin OS 2./.test(osVersion) return "#{@constants.bootMountPointFromEnv}/config.json" else if /^Resin OS 1./.test(osVersion) # In Resin OS 1.12, $BOOT_MOUNTPOINT was added and it coincides with config.json's path - return "#{@constants.bootMountPointFromEnv}/config.json" if @constants.bootMountPointFromEnv + if @constants.bootMountPointFromEnv + return "#{@constants.bootMountPointFromEnv}/config.json" # Older 1.X versions have config.json here return '/mnt/conf/config.json' else @@ -287,7 +294,8 @@ module.exports = class Config extends EventEmitter Promise.try => # Write value to config.json or DB { configJsonVals, dbVals } = _.reduce(keyValues, (acc, val, key) => - throw new Error("Attempt to change immutable config value #{key}") if !@schema[key]?.mutable + if !@schema[key]?.mutable + throw new Error("Attempt to change immutable config value #{key}") switch @schema[key]?.source when 'config.json' acc.configJsonVals[key] = val @@ -305,7 +313,8 @@ module.exports = class Config extends EventEmitter if oldValues[key] != value @db.upsertModel('config', { key, value }, { key }, tx) .then => - @configJsonSet(configJsonVals) if !_.isEmpty(configJsonVals) + if !_.isEmpty(configJsonVals) + @configJsonSet(configJsonVals) if trx? setValuesInTransaction(trx) else @@ -320,7 +329,8 @@ module.exports = class Config extends EventEmitter # only mutable fields! remove: (key) => Promise.try => - throw new Error("Attempt to delete immutable config value #{key}") if !@schema[key]?.mutable + if !@schema[key]?.mutable + throw new Error("Attempt to delete immutable config value #{key}") switch @schema[key]?.source when 'config.json' @configJsonRemove(key) diff --git a/src/db.coffee b/src/db.coffee index 8e2ec03f..80b2b1b4 100644 --- a/src/db.coffee +++ b/src/db.coffee @@ -1,6 +1,5 @@ -Promise = require 'bluebird' Knex = require 'knex' - +path = require 'path' constants = require './lib/constants' module.exports = class DB @@ -13,206 +12,8 @@ module.exports = class DB useNullAsDefault: true ) - addColumn: (table, column, type) => - @knex.schema.hasColumn(table, column) - .then (exists) => - if not exists - @knex.schema.table table, (t) -> - t[type](column) - - dropColumn: (table, column) => - @knex.schema.hasColumn(table, column) - .then (exists) => - if exists - @knex.schema.table table, (t) -> - t.dropColumn(column) - - dropTableIfExists: (tableName, trx) => - knex = trx ? @knex - knex.schema.hasTable(tableName) - .then (exists) -> - knex.schema.dropTable(tableName) if exists - - _migrateToV2: => - # Drop all tables, but keep the info we need - @transaction (trx) => - trx.schema.hasTable('legacyData') - .then (exists) => - if not exists - trx.schema.createTable 'legacyData', (t) -> - t.json('apps') - t.json('dependentApps') - t.json('dependentDevices') - .then => - Promise.join( - trx.schema.hasTable('app') - .then (exists) -> - if exists - trx.select().from('app') - else - return [] - .then(JSON.stringify) - trx.schema.hasTable('dependentDevice') - .then (exists) -> - if exists - trx.select().from('dependentDevice') - else - return [] - .then(JSON.stringify) - trx.schema.hasTable('dependentApp') - .then (exists) -> - if exists - trx.select().from('dependentApp') - else - return [] - .then(JSON.stringify) - (apps, dependentDevices, dependentApps) => - @upsertModel('legacyData', { apps, dependentDevices, dependentApps }, {}, trx) - ) - .then => - @dropTableIfExists('app', trx) - .then => - @dropTableIfExists('deviceConfig', trx) - .then => - @dropTableIfExists('dependentApp', trx) - .then => - @dropTableIfExists('dependentDevice', trx) - .then => - @dropTableIfExists('image', trx) - .then => - @dropTableIfExists('container', trx) - - finishMigration: => - @transaction (trx) => - @upsertModel('config', { key: 'schema-version', value: '2' }, { key: 'schema-version' }, trx) - .then => - @dropTableIfExists('legacyData', trx) - - _initConfigAndGetSchemaVersion: => - @knex.schema.hasTable('config') - .then (exists) => - if not exists - @knex.schema.createTable 'config', (t) -> - t.string('key').primary() - t.string('value') - .then => - @knex('config').insert({ key: 'schema-version', value: '2' }) - .then => - @knex('config').where({ key: 'schema-version' }).select() - .then ([ schemaVersion ]) -> - return schemaVersion - init: => - migrationNeeded = false - @_initConfigAndGetSchemaVersion() - .then (schemaVersion) => - if !schemaVersion? or schemaVersion.value != '2' - # We're on an old db, need to migrate - migrationNeeded = true - @_migrateToV2() - .then => - Promise.all([ - @knex.schema.hasTable('deviceConfig') - .then (exists) => - if not exists - @knex.schema.createTable 'deviceConfig', (t) -> - t.json('targetValues') - .then => - @knex('deviceConfig').select() - .then (deviceConfigs) => - @knex('deviceConfig').insert({ targetValues: '{}' }) if deviceConfigs.length == 0 - - @knex.schema.hasTable('app') - .then (exists) => - if not exists - @knex.schema.createTable 'app', (t) -> - t.increments('id').primary() - t.string('name') - t.string('releaseId') - t.string('commit') - t.string('appId') - t.json('services') - t.json('networks') - t.json('volumes') - - @knex.schema.hasTable('dependentAppTarget') - .then (exists) => - if not exists - @knex.schema.createTable 'dependentAppTarget', (t) -> - t.increments('id').primary() - t.string('appId') - t.string('parentApp') - t.string('name') - t.string('commit') - t.string('releaseId') - t.string('imageId') - t.string('image') - t.json('environment') - t.json('config') - - @knex.schema.hasTable('dependentDeviceTarget') - .then (exists) => - if not exists - @knex.schema.createTable 'dependentDeviceTarget', (t) -> - t.increments('id').primary() - t.string('uuid') - t.string('name') - t.json('apps') - - @knex.schema.hasTable('dependentApp') - .then (exists) => - if not exists - @knex.schema.createTable 'dependentApp', (t) -> - t.increments('id').primary() - t.string('appId') - t.string('parentApp') - t.string('name') - t.string('commit') - t.string('releaseId') - t.string('image') - t.json('environment') - t.json('config') - - @knex.schema.hasTable('dependentDevice') - .then (exists) => - if not exists - @knex.schema.createTable 'dependentDevice', (t) -> - t.increments('id').primary() - t.string('uuid') - t.string('appId') - t.string('localId') - t.string('device_type') - t.string('logs_channel') - t.string('deviceId') - t.boolean('is_online') - t.string('name') - t.string('status') - t.string('download_progress') - t.string('is_managed_by') - t.dateTime('lock_expiry_date') - t.string('commit') - t.string('targetCommit') - t.json('environment') - t.json('targetEnvironment') - t.json('config') - t.json('targetConfig') - t.boolean('markedForDeletion') - - @knex.schema.hasTable('image') - .then (exists) => - if not exists - @knex.schema.createTable 'image', (t) -> - t.increments('id').primary() - t.string('name') - t.string('appId') - t.string('serviceId') - t.string('serviceName') - t.string('imageId') - t.string('releaseId') - t.boolean('dependent') - ]) - .then -> - return migrationNeeded + @knex.migrate.latest(directory: path.join(__dirname, 'migrations')) # Returns a knex object for one of the models (tables) models: (modelName) => @@ -222,7 +23,8 @@ module.exports = class DB knex = trx ? @knex knex(modelName).update(obj).where(id) .then (n) -> - knex(modelName).insert(obj) if n == 0 + if n == 0 + knex(modelName).insert(obj) transaction: (cb) => @knex.transaction(cb) diff --git a/src/device-config.coffee b/src/device-config.coffee index 3c16178c..f3c9f367 100644 --- a/src/device-config.coffee +++ b/src/device-config.coffee @@ -37,7 +37,7 @@ arrayConfigKeys = [ 'dtparam', 'dtoverlay', 'device_tree_param', 'device_tree_ov module.exports = class DeviceConfig constructor: ({ @db, @config, @logger }) -> @rebootRequired = false - @validActions = [ 'changeConfig', 'setLogToDisplay', 'setBootConfig' ] + @validActions = _.keys(@actionExecutors) @configKeys = { appUpdatePollInterval: { envVarName: 'RESIN_SUPERVISOR_POLL_INTERVAL', varType: 'int', defaultValue: '60000' } localMode: { envVarName: 'RESIN_SUPERVISOR_LOCAL_MODE', varType: 'bool', defaultValue: 'false' } @@ -96,7 +96,7 @@ module.exports = class DeviceConfig targetBootConfig = @envToBootConfig(target) currentBootConfig = @envToBootConfig(current) if !_.isEqual(currentBootConfig, targetBootConfig) - _.forEach forbiddenConfigKeys, (key) => + for key in forbiddenConfigKeys if currentBootConfig[key] != targetBootConfig[key] err = "Attempt to change blacklisted config value #{key}" @logger.logSystemMessage(err, { error: err }, 'Apply boot config error') @@ -140,7 +140,8 @@ module.exports = class DeviceConfig action: 'setBootConfig' target }) - return if !_.isEmpty(steps) + if !_.isEmpty(steps) + return if @rebootRequired steps.push({ action: 'reboot' @@ -155,22 +156,24 @@ module.exports = class DeviceConfig return [{ action: 'noop' }] else return filteredSteps + actionExecutors: { + changeConfig: (step) => + @logger.logConfigChange(step.humanReadableTarget) + @config.set(step.target) + .then => + @logger.logConfigChange(step.humanReadableTarget, { success: true }) + .catch (err) => + @logger.logConfigChange(step.humanReadableTarget, { err }) + throw err + setLogToDisplay: (step) => + @setLogToDisplay(step.target) + setBootConfig: (step) => + @config.get('deviceType') + .then (deviceType) => + @setBootConfig(deviceType, step.target) + } executeStepAction: (step) => - switch step.action - when 'changeConfig' - @logger.logConfigChange(step.humanReadableTarget) - @config.set(step.target) - .then => - @logger.logConfigChange(step.humanReadableTarget, { success: true }) - .catch (err) => - @logger.logConfigChange(step.humanReadableTarget, { err }) - throw err - when 'setLogToDisplay' - @setLogToDisplay(step.target) - when 'setBootConfig' - @config.get('deviceType') - .then (deviceType) => - @setBootConfig(deviceType, step.target) + @actionExecutors[step.action](step) envToBootConfig: (env) -> # We ensure env doesn't have garbage @@ -199,32 +202,33 @@ module.exports = class DeviceConfig getBootConfig: (deviceType) => Promise.try => - return {} if !_.startsWith(deviceType, 'raspberry') + if !_.startsWith(deviceType, 'raspberry') + return {} @readBootConfig() .then (configTxt) => conf = {} configStatements = configTxt.split(/\r?\n/) - _.forEach configStatements, (configStr) -> + for configStr in configStatements keyValue = /^([^#=]+)=(.+)/.exec(configStr) if keyValue? if !_.includes(arrayConfigKeys, keyValue[1]) conf[keyValue[1]] = keyValue[2] - return else conf[keyValue[1]] ?= [] conf[keyValue[1]].push(keyValue[2]) - return - keyValue = /^(initramfs) (.+)/.exec(configStr) - if keyValue? - conf[keyValue[1]] = keyValue[2] - return + else + keyValue = /^(initramfs) (.+)/.exec(configStr) + if keyValue? + conf[keyValue[1]] = keyValue[2] return @bootConfigToEnv(conf) getLogToDisplay: -> gosuper.get('/v1/log-to-display', { json: true }) .spread (res, body) -> - return undefined if res.statusCode == 404 - throw new Error("Error getting log to display status: #{res.statusCode} #{body.Error}") if res.statusCode != 200 + if res.statusCode == 404 + return undefined + if res.statusCode != 200 + throw new Error("Error getting log to display status: #{res.statusCode} #{body.Error}") return Boolean(body.Data) setLogToDisplay: (val) => @@ -248,7 +252,8 @@ module.exports = class DeviceConfig setBootConfig: (deviceType, target) => Promise.try => conf = @envToBootConfig(target) - return false if !_.startsWith(deviceType, 'raspberry') + if !_.startsWith(deviceType, 'raspberry') + return false @logger.logSystemMessage("Applying boot config: #{JSON.stringify(conf)}", {}, 'Apply boot config in progress') configStatements = [] _.forEach conf, (val, key) -> @@ -275,7 +280,8 @@ module.exports = class DeviceConfig getVPNEnabled: -> gosuper.get('/v1/vpncontrol', { json: true }) .spread (res, body) -> - throw new Error("Error getting vpn status: #{res.statusCode} #{body.Error}") if res.statusCode != 200 + if res.statusCode != 200 + throw new Error("Error getting vpn status: #{res.statusCode} #{body.Error}") return Boolean(body.Data) setVPNEnabled: (val) -> diff --git a/src/device-state.coffee b/src/device-state.coffee index 8fead9bf..b20ad124 100644 --- a/src/device-state.coffee +++ b/src/device-state.coffee @@ -12,18 +12,17 @@ constants = require './lib/constants' validation = require './lib/validation' device = require './lib/device' updateLock = require './lib/update-lock' -migration = require './lib/migration' DeviceConfig = require './device-config' Logger = require './logger' ApplicationManager = require './application-manager' validateLocalState = (state) -> - if state.name? and !validation.isValidShortText(state.name) + if !state.name? or !validation.isValidShortText(state.name) throw new Error('Invalid device name') - if state.apps? and !validation.isValidAppsObject(state.apps) + if !state.apps? or !validation.isValidAppsObject(state.apps) throw new Error('Invalid apps') - if state.config? and !validation.isValidEnv(state.config) + if !state.config? or !validation.isValidEnv(state.config) throw new Error('Invalid device configuration') validateDependentState = (state) -> @@ -33,9 +32,13 @@ validateDependentState = (state) -> throw new Error('Invalid dependent devices') validateState = Promise.method (state) -> - throw new Error('State must be an object') if !_.isObject(state) - validateLocalState(state.local) if state.local? - validateDependentState(state.dependent) if state.dependent? + if !_.isObject(state) + throw new Error('State must be an object') + if !_.isObject(state.local) + throw new Error('Local state must be an object') + validateLocalState(state.local) + if state.dependent? + validateDependentState(state.dependent) class DeviceStateRouter constructor: (@deviceState) -> @@ -99,7 +102,7 @@ module.exports = class DeviceState extends EventEmitter @_currentVolatile = {} _lock = new Lock() @_writeLock = Promise.promisify(_lock.async.writeLock) - @_readLock = Promise.promisify(_lock.async.writeLock) + @_readLock = Promise.promisify(_lock.async.readLock) @lastSuccessfulUpdate = null @failedUpdates = 0 @stepsInProgress = [] @@ -124,119 +127,24 @@ module.exports = class DeviceState extends EventEmitter @applications.on('change', @reportCurrentState) - normaliseLegacy: ({ apps, dependentApps, dependentDevices }) => - legacyTarget = { local: { apps: [], config: {} }, dependent: { apps: [], devices: [] } } - - tryParseObj = (j) -> - try - JSON.parse(j) - catch - {} - tryParseArray = (j) -> - try - JSON.parse(j) - catch - [] - - dependentDevices = tryParseArray(dependentDevices) - # Old containers have to be killed as we can't update their labels - @deviceConfig.getCurrent() - .then (deviceConf) => - legacyTarget.local.config = deviceConf - console.log('Killing legacy containers') - @applications.services.killAllLegacy() + normaliseLegacy: => + # When legacy apps are present, we kill their containers and migrate their /data to a named volume + # (everything else is handled by the knex migration) + console.log('Killing legacy containers') + @applications.services.killAllLegacy() .then => - @config.get('name') - .then (name) -> - legacyTarget.local.name = name ? '' + console.log('Migrating legacy app volumes') + @applications.getTargetApps() + .map (app) => + @applications.volumes.createFromLegacy(app.appId) .then => - console.log('Migrating apps') - Promise.map tryParseArray(apps), (app) => - @applications.images.normalise(app.imageId) - .then (image) => - appAsState = { - image: image - commit: app.commit - name: app.name ? '' - environment: tryParseObj(app.env) - config: tryParseObj(app.config) - } - appAsState.environment = _.omitBy appAsState.environment, (v, k) -> - _.startsWith(k, 'RESIN_') - appId = app.appId - # Only for apps for which we have the image locally, - # we translate that app to the target state so that we run - # a (mostly) compatible container until a new target state is fetched. - # This is also necessary to avoid cleaning up the image, which may - # be needed for cache/delta in the next update. - multicontainerApp = migration.singleToMulticontainerApp(appAsState, appId) - @applications.images.inpectByName(appAsState.image) - .then => - @applications.images.markAsSupervised(@applications.imageForService(multicontainerApp.services['1'])) - .then => - if !_.isEmpty(appAsState.config) - devConf = @deviceConfig.filterConfigKeys(appAsState.config) - _.assign(legacyTarget.local.config, devConf) if !_.isEmpty(devConf) - legacyTarget.local.apps.push(multicontainerApp) - .then => - @applications.volumes.createFromLegacy(appId) - .catch (err) -> - console.error("Ignoring legacy app #{app.imageId} due to #{err}") - .then => - console.log('Migrating dependent apps and devices') - Promise.map tryParseArray(dependentApps), (app) => - appAsState = { - appId: app.appId - parentApp: app.parentAppId - image: app.imageId - releaseId: null - commit: app.commit - name: app.name - config: tryParseObj(app.config) - environment: tryParseObj(app.environment) - } - @applications.images.inspectByName(app.imageId) - .then => - @applications.images.markAsSupervised(@applications.proxyvisor.imageForDependentApp(appAsState)) - .then => - appForDB = _.clone(appAsState) - appForDB.config = JSON.stringify(appAsState.config) - appForDB.environment = JSON.stringify(appAsState.environment) - @db.models('dependentApp').insert(appForDB) - .then => - legacyTarget.dependent.apps.push(appAsState) - devicesForThisApp = _.filter(dependentDevices ? [], (d) -> d.appId == appAsState.appId) - if !_.isEmpty(devicesForThisApp) - devices = _.map devicesForThisApp, (d) -> - d.markedForDeletion ?= false - d.localId ?= null - d.is_managed_by ?= null - d.lock_expiry_date ?= null - return d - devicesForState = _.map devicesForThisApp, (d) -> - dev = { - uuid: d.uuid - name: d.name - apps: {} - } - dev.apps[d.appId] = { - config: tryParseObj(d.targetConfig) - environment: tryParseObj(d.targetEnvironment) - } - legacyTarget.dependent.devices = legacyTarget.dependent.devices.concat(devicesForState) - @db.models('dependentDevice').insert(devices) - .catch (err) -> - console.error("Ignoring legacy dependent app #{app.imageId} due to #{err}") - .then => - @setTarget(legacyTarget) - .then => - @config.set({ initialConfigSaved: 'true' }) + @config.set({ legacyAppsPresent: 'false' }) init: -> @config.getMany([ 'logsChannelSecret', 'pubnub', 'offlineMode', 'loggingEnabled', 'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant', 'version', 'provisioned', - 'resinApiEndpoint', 'connectivityCheckEnabled' + 'resinApiEndpoint', 'connectivityCheckEnabled', 'legacyAppsPresent' ]) .then (conf) => @logger.init({ @@ -247,8 +155,13 @@ module.exports = class DeviceState extends EventEmitter }) .then => @config.on 'change', (changedConfig) => - @logger.enable(changedConfig.loggingEnabled) if changedConfig.loggingEnabled? - @reportCurrentState(api_secret: changedConfig.apiSecret) if changedConfig.apiSecret? + if changedConfig.loggingEnabled? + @logger.enable(changedConfig.loggingEnabled) + if changedConfig.apiSecret? + @reportCurrentState(api_secret: changedConfig.apiSecret) + .then => + if validation.checkTruthy(conf.legacyAppsPresent) + @normaliseLegacy() .then => @applications.init() .then => @@ -271,14 +184,16 @@ module.exports = class DeviceState extends EventEmitter update_downloaded: false ) .then => - @loadTargetFromFile() if !conf.provisioned + if !conf.provisioned + @loadTargetFromFile() .then => @triggerApplyTarget() initNetworkChecks: ({ resinApiEndpoint, connectivityCheckEnabled }) => network.startConnectivityCheck(resinApiEndpoint, connectivityCheckEnabled) @config.on 'change', (changedConfig) -> - network.enableConnectivityCheck(changedConfig.connectivityCheckEnabled) if changedConfig.connectivityCheckEnabled? + if changedConfig.connectivityCheckEnabled? + network.enableConnectivityCheck(changedConfig.connectivityCheckEnabled) console.log('Starting periodic check for IP addresses') network.startIPAddressUpdate (addresses) => @reportCurrentState( @@ -296,31 +211,38 @@ module.exports = class DeviceState extends EventEmitter emitAsync: (ev, args...) => setImmediate => @emit(ev, args...) - readLockTarget: => + _readLockTarget: => @_readLock('target').disposer (release) -> release() - writeLockTarget: => + _writeLockTarget: => @_writeLock('target').disposer (release) -> release() - inferStepsLock: => + _inferStepsLock: => @_writeLock('inferSteps').disposer (release) -> release() + usingReadLockTarget: (fn) => + Promise.using @_readLockTarget, -> fn() + usingWriteLockTarget: (fn) => + Promise.using @_writeLockTarget, -> fn() + usingInferStepsLock: (fn) => + Promise.using @_inferStepsLock, -> fn() + setTarget: (target) -> validateState(target) .then => - Promise.using @writeLockTarget(), => + @usingWriteLockTarget => # Apps, deviceConfig, dependent @db.transaction (trx) => Promise.try => - @config.set({ name: target.local.name }, trx) if target.local?.name? + @config.set({ name: target.local.name }, trx) .then => - @deviceConfig.setTarget(target.local.config, trx) if target.local?.config? + @deviceConfig.setTarget(target.local.config, trx) .then => - @applications.setTarget(target.local?.apps, target.dependent, trx) + @applications.setTarget(target.local.apps, target.dependent, trx) getTarget: -> - Promise.using @readLockTarget(), => + @usingReadLockTarget => Promise.props({ local: Promise.props({ name: @config.get('name') @@ -337,7 +259,8 @@ module.exports = class DeviceState extends EventEmitter _.merge(theState.local, @_currentVolatile) theState.local.apps = appsStatus.local theState.dependent.apps = appsStatus.dependent - theState.local.is_on__commit = appsStatus.commit if appsStatus.commit + if appsStatus.commit + theState.local.is_on__commit = appsStatus.commit return theState getCurrentForComparison: -> @@ -425,12 +348,13 @@ module.exports = class DeviceState extends EventEmitter throw new Error("Invalid action #{step.action}") applyStepAsync: (step, { force, targetState }) => - return if @shuttingDown + if @shuttingDown + return @stepsInProgress.push(step) setImmediate => @executeStepAction(step, { force, targetState }) .finally => - Promise.using @inferStepsLock(), => + @usingInferStepsLock => _.pullAllWith(@stepsInProgress, [ step ], _.isEqual) .then (stepResult) => @emitAsync('step-completed', null, step, stepResult) @@ -456,7 +380,7 @@ module.exports = class DeviceState extends EventEmitter applyTarget: ({ force = false } = {}) => console.log('Applying target state') - Promise.using @inferStepsLock(), => + @usingInferStepsLock => Promise.join( @getCurrentForComparison() @getTarget() @@ -484,7 +408,8 @@ module.exports = class DeviceState extends EventEmitter @applyError(err, force) continueApplyTarget: ({ force = false } = {}) => - return if @applyContinueScheduled + if @applyContinueScheduled + return @applyContinueScheduled = true setTimeout( => @applyContinueScheduled = false @@ -492,6 +417,7 @@ module.exports = class DeviceState extends EventEmitter , 1000) return + # TODO: Make this and applyTarget work purely with promises, no need to wait on events triggerApplyTarget: ({ force = false, delay = 0 } = {}) => if @applyInProgress if !@scheduledApply? diff --git a/src/event-tracker.coffee b/src/event-tracker.coffee index d012149c..5c687bcf 100644 --- a/src/event-tracker.coffee +++ b/src/event-tracker.coffee @@ -39,7 +39,8 @@ module.exports = class EventTracker # Don't log private env vars (e.g. api keys) or other secrets - use a whitelist to mask what we send properties = mask(properties, mixpanelMask) @_logEvent('Event:', ev, JSON.stringify(properties)) - return if !@_client? + if !@_client? + return # Mutation is bad, and it should feel bad properties = _.assign(properties, @_properties) @_client.track(ev, properties) @@ -49,5 +50,6 @@ module.exports = class EventTracker @_properties = distinct_id: uuid uuid: uuid - return if offlineMode + if offlineMode + return @_client = mixpanel.init(mixpanelToken, { host: mixpanelHost }) diff --git a/src/lib/constants.coffee b/src/lib/constants.coffee index 8ec0d8f5..1b4d2149 100644 --- a/src/lib/constants.coffee +++ b/src/lib/constants.coffee @@ -31,4 +31,5 @@ module.exports = defaultMixpanelToken: process.env.DEFAULT_MIXPANEL_TOKEN allowedInterfaces: ['resin-vpn', 'tun0', 'docker0', 'lo'] appsJsonPath: process.env.APPS_JSON_PATH ? '/boot/apps.json' - ipAddressUpdateInterval: 30000 + ipAddressUpdateInterval: 30 * 1000 + imageCleanupErrorIgnoreTimeout: 3600 * 1000 diff --git a/src/lib/docker-utils.coffee b/src/lib/docker-utils.coffee index 427627a8..d2fdbb5e 100644 --- a/src/lib/docker-utils.coffee +++ b/src/lib/docker-utils.coffee @@ -44,6 +44,7 @@ module.exports = class DockerUtils extends DockerToolbelt repoName = imageName return { repo: repoName, tag: tagName } + # TODO: somehow fix this to work with image names having repo digests instead of tags rsyncImageWithProgress: (imgDest, fullDeltaOpts, onProgress) => { deltaRequestTimeout, deltaApplyTimeout, deltaRetryCount, deltaRetryInterval, diff --git a/src/lib/update-lock.coffee b/src/lib/update-lock.coffee index 24f630e4..09c10135 100644 --- a/src/lib/update-lock.coffee +++ b/src/lib/update-lock.coffee @@ -4,48 +4,53 @@ TypedError = require 'typed-error' lockFile = Promise.promisifyAll(require('lockfile')) Lock = require 'rwlock' fs = Promise.promisifyAll(require('fs')) +path = require 'path' constants = require './constants' ENOENT = (err) -> err.code is 'ENOENT' baseLockPath = (appId) -> - return "/tmp/resin-supervisor/services/#{appId}" + return path.join('/tmp/resin-supervisor/services', appId.toString()) + exports.lockPath = (appId, serviceName) -> - return "#{baseLockPath(appId)}/#{serviceName}" + return path.join(baseLockPath(appId), serviceName) lockFileOnHost = (appId, serviceName) -> - return "#{constants.rootMountPoint}#{exports.lockPath(appId, serviceName)}/resin-updates.lock" + return path.join(constants.rootMountPoint, exports.lockPath(appId, serviceName), 'resin-updates.lock') exports.UpdatesLockedError = class UpdatesLockedError extends TypedError exports.lock = do -> _lock = new Lock() _writeLock = Promise.promisify(_lock.async.writeLock) - return (appId, { force = false } = {}) -> - Promise.try -> - return if !appId? - locksTaken = [] - dispose = (release) -> - Promise.map locksTaken, (lockName) -> - lockFile.unlockAsync(lockName) - .finally -> - release() - _writeLock(appId) - .tap (release) -> - fs.readdirAsync(baseLockPath(appId)) - .catch ENOENT, -> [] - .mapSeries (serviceName) -> - tmpLockName = lockFileOnHost(appId, serviceName) - Promise.try -> - lockFile.unlockAsync(tmpLockName) if force == true - .then -> - lockFile.lockAsync(tmpLockName) + return (appId, { force = false } = {}, fn) -> + takeTheLock = -> + Promise.try -> + return if !appId? + locksTaken = [] + dispose = (release) -> + Promise.map locksTaken, (lockName) -> + lockFile.unlockAsync(lockName) + .finally -> + release() + _writeLock(appId) + .tap (release) -> + theLockDir = path.join(constants.rootMountPoint, baseLockPath(appId)) + fs.readdirAsync(theLockDir) + .catch ENOENT, -> [] + .mapSeries (serviceName) -> + tmpLockName = lockFileOnHost(appId, serviceName) + Promise.try -> + lockFile.unlockAsync(tmpLockName) if force == true .then -> - locksTaken.push(tmpLockName) - .catch ENOENT, _.noop - .catch (err) -> - dispose(release) - .finally -> - throw new exports.UpdatesLockedError("Updates are locked: #{err.message}") - .disposer(dispose) + lockFile.lockAsync(tmpLockName) + .then -> + locksTaken.push(tmpLockName) + .catch ENOENT, _.noop + .catch (err) -> + dispose(release) + .finally -> + throw new exports.UpdatesLockedError("Updates are locked: #{err.message}") + .disposer(dispose) + Promise.using takeTheLock(), -> fn() diff --git a/src/migrations/20171129013519_legacy.js b/src/migrations/20171129013519_legacy.js new file mode 100644 index 00000000..e6ba98a1 --- /dev/null +++ b/src/migrations/20171129013519_legacy.js @@ -0,0 +1,136 @@ +// This migration implements the legacy db schema used in supervisors lower than 7.0.0. + +// It's a bit ugly for a migration (it's unusual that migrations check for existence of tables and columns) +// but being the first migration for a legacy system, this is the easiest way to bring the db +// to a known schema to start doing proper migrations afterwards. +// For reference, compare this to db.coffee in old supervisors (e.g. v6.4.2), but consider we've added +// a few dropColumn and dropTable calls to delete things that were removed throughout the supervisor's +// history without actually adding drop statements (mostly just becoming unused, but still there). + +exports.up = function (knex, Promise) { + let addColumn = function (table, column, type) { + return knex.schema.hasColumn(table, column) + .then((exists) => { + if (!exists) { + return knex.schema.table(table, (t) => { return t[type](column) }) + } + }) + } + let dropColumn = function (table, column) { + return knex.schema.hasColumn(table, column) + .then((exists) => { + if (exists) { + return knex.schema.table(table, (t) => { return t.dropColumn(column) }) + } + }) + } + let createTableOrRun = function (tableName, tableCreator, runIfTableExists) { + return knex.schema.hasTable(tableName) + .then((exists) => { + if (!exists) { + return knex.schema.createTable(tableName, tableCreator) + } else if (runIfTableExists != null) { + return runIfTableExists() + } + }) + } + let dropTable = function (tableName) { + return knex.schema.hasTable(tableName) + .then((exists) => { + if (exists) { + return knex.schema.dropTable(tableName) + } + }) + } + return Promise.all([ + createTableOrRun('config', (t) => { + t.string('key').primary() + t.string('value') + }), + createTableOrRun('deviceConfig', (t) => { + t.json('values') + t.json('targetValues') + }).then(() => { + return knex('deviceConfig').select() + .then((deviceConfigs) => { + if (deviceConfigs.length == 0) { + return knex('deviceConfig').insert({ values: '{}', targetValues: '{}' }) + } + }) + }), + createTableOrRun('app', (t) => { + t.increments('id').primary() + t.string('name') + t.string('containerName') + t.string('commit') + t.string('imageId') + t.string('appId') + t.boolean('privileged') + t.json('env') + t.json('config') + t.boolean('markedForDeletion') + }, () => { + return Promise.all([ + addColumn('app', 'commit', 'string'), + addColumn('app', 'appId', 'string'), + addColumn('app', 'containerName', 'string'), + addColumn('app', 'config', 'json'), + addColumn('app', 'markedForDeletion', 'boolean'), + dropColumn('app', 'containerId') + ]).then(() => { + //When updating from older supervisors, config can be null + return knex('app').update({ config: '{}' }).whereNull('config') + .then(() => { + knex('app').update({ markedForDeletion: false }).whereNull('markedForDeletion') + }) + }) + }), + createTableOrRun('dependentApp', (t) => { + t.increments('id').primary() + t.string('appId') + t.string('parentAppId') + t.string('name') + t.string('commit') + t.string('imageId') + t.json('config') + t.json('environment') + }, () => { + return addColumn('dependentApp', 'environment', 'json') + }), + createTableOrRun('dependentDevice', (t) => { + t.increments('id').primary() + t.string('uuid') + t.string('appId') + t.string('localId') + t.string('device_type') + t.string('logs_channel') + t.string('deviceId') + t.boolean('is_online') + t.string('name') + t.string('status') + t.string('download_progress') + t.string('is_managed_by') + t.dateTime('lock_expiry_date') + t.string('commit') + t.string('targetCommit') + t.json('environment') + t.json('targetEnvironment') + t.json('config') + t.json('targetConfig') + t.boolean('markedForDeletion') + }, () => { + return Promise.all([ + addColumn('dependentDevice', 'markedForDeletion', 'boolean'), + addColumn('dependentDevice', 'localId', 'string'), + addColumn('dependentDevice', 'is_managed_by', 'string'), + addColumn('dependentDevice', 'lock_expiry_date', 'dateTime') + ]) + }), + dropTable('image'), + dropTable('container') + ]) +} + +exports.down = function(knex, Promise) { + return Promise.try(() => { throw new Error('Not implemented') }) +} diff --git a/src/migrations/20171129064057_multicontainer.js b/src/migrations/20171129064057_multicontainer.js new file mode 100644 index 00000000..8c2d2ce3 --- /dev/null +++ b/src/migrations/20171129064057_multicontainer.js @@ -0,0 +1,309 @@ + +var defaultLegacyVolume = function (appId) { + return `resin-data-${appId}` +} + +var tryParse = function (obj) { + try { + return JSON.parse(obj) + } + catch(e) { + return {} + } +} + +var singleToMulticontainerApp = function (app, appId) { + // From *very* old supervisors, env or config may be null + // so we ignore errors parsing them + let conf = tryParse(app.config) + let env = tryParse(app.env) + let environment = {} + for (let key in env) { + if (!/^RESIN_/.test(key)) { + environment[key] = env[key] + } + } + let newApp = { + appId: appId, + commit: app.commit, + name: app.name, + releaseId: 1, + networks: {}, + volumes: {} + } + let defaultVolume = defaultLegacyVolume(appId) + newApp.volumes[defaultVolume] = {} + let updateStrategy = conf['RESIN_SUPERVISOR_UPDATE_STRATEGY'] + if (updateStrategy == null) { + updateStrategy = 'download-then-kill' + } + let handoverTimeout = conf['RESIN_SUPERVISOR_HANDOVER_TIMEOUT'] + if (handoverTimeout == null) { + handoverTimeout = '' + } + let restartPolicy = conf['RESIN_APP_RESTART_POLICY'] + if (restartPolicy == null) { + restartPolicy = 'unless-stopped' + } + newApp.services = [ + { + serviceId: 1, + appId: appId, + serviceName: 'main', + imageId: 1, + commit: app.commit, + releaseId: 1, + image: app.imageId, + privileged: true, + network_mode: 'host', + volumes: [ + `${defaultVolume}:/data` + ], + labels: { + 'io.resin.features.kernel_modules': '1', + 'io.resin.features.firmware': '1', + 'io.resin.features.dbus': '1', + 'io.resin.features.supervisor_api': '1', + 'io.resin.features.resin_api': '1', + 'io.resin.update.strategy': updateStrategy, + 'io.resin.update.handover_timeout': handoverTimeout + }, + environment: environment, + restart: restartPolicy, + running: true + } + ] + return newApp +} + +var jsonifyAppFields = function (app) { + let newApp = Object.assign({}, app) + newApp.services = JSON.stringify(app.services) + newApp.networks = JSON.stringify(app.networks) + newApp.volumes = JSON.stringify(app.volumes) + return newApp +} + +var imageForApp = function (app) { + let service = app.services[0] + return { + name: service.image, + appId: service.appId, + serviceId: service.serviceId, + serviceName: service.serviceName, + imageId: service.imageId, + releaseId: service.releaseId, + dependent: 0 + } +} + +var imageForDependentApp = function (app) { + return { + name: app.image, + appId: app.appId, + serviceId: null, + serviceName: null, + imageId: app.imageId, + releaseId: null, + dependent: 1 + } +} + +// TODO: this whole thing is WIP +exports.up = function (knex, Promise) { + return knex.schema.createTable('image', (t) => { + t.increments('id').primary() + t.string('name') + t.integer('appId') + t.integer('serviceId') + t.string('serviceName') + t.integer('imageId') + t.integer('releaseId') + t.boolean('dependent') + }) + .then(() => knex('app').select().whereNot({ markedForDeletion: true }).orWhereNull('markedForDeletion')) + .tap((apps) => { + if (apps.length > 0) { + return knex('config').insert({ key: 'legacyAppsPresent', value: 'true' }) + } + }) + .tap(() => { + // We're in a transaction, and it's easier to drop and recreate + // than to migrate each field... + return knex.schema.dropTable('app') + .then(() => { + return knex.schema.createTable('app', (t) => { + t.increments('id').primary() + t.string('name') + t.integer('releaseId') + t.string('commit') + t.integer('appId') + t.json('services') + t.json('networks') + t.json('volumes') + }) + }) + }) + .map((app) => { + let migratedApp = singleToMulticontainerApp(app) + return knex('app').insert(jsonifyAppFields(migratedApp)) + .then(() => knex('image').insert(imageForApp(migratedApp))) + }) + .then(() => { + // For some reason dropping a column in this table doesn't work. Anyways, we don't want to store old targetValues. + // Instead, on first run the supervisor will store current device config values as targets - so we want to + // make the old values that refer to supervisor config be the *current* values, and we do that by inserting + // to the config table. + return knex('deviceConfig').select() + .then((deviceConf) => { + return knex.schema.dropTable('deviceConfig') + .then(() => { + let values = JSON.parse(deviceConf[0].values) + let promises = [] + let configKeys = { + 'RESIN_SUPERVISOR_POLL_INTERVAL': 'appUpdatePollInterval', + 'RESIN_SUPERVISOR_LOCAL_MODE': 'localMode', + 'RESIN_SUPERVISOR_CONNECTIVITY_CHECK': 'connectivityCheckEnabled', + 'RESIN_SUPERVISOR_LOG_CONTROL': 'loggingEnabled', + 'RESIN_SUPERVISOR_DELTA': 'delta', + 'RESIN_SUPERVISOR_DELTA_REQUEST_TIMEOUT': 'deltaRequestTimeout', + 'RESIN_SUPERVISOR_DELTA_APPLY_TIMEOUT': 'deltaApplyTimeout', + 'RESIN_SUPERVISOR_DELTA_RETRY_COUNT': 'deltaRetryCount', + 'RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL': 'deltaRequestTimeout', + 'RESIN_SUPERVISOR_OVERRIDE_LOCK': 'lockOverride' + } + for (let envVarName in values) { + if (configKeys[envVarName] != null) { + promises.push(knex('config').insert({ key: configKeys[envVarName], value: values[envVarName]})) + } + } + return Promise.all(promises) + }) + }) + .then(() => { + return knex.schema.createTable('deviceConfig', (t) => { + t.json('targetValues') + }) + }) + .then(() => knex('deviceConfig').insert({ targetValues: '{}' })) + }) + .then(() => knex('dependentApp').select()) + .then((dependentApps) => { + return knex.schema.dropTable('dependentApp') + .then(() => { + return knex.schema.createTable('dependentApp', (t) => { + t.increments('id').primary() + t.integer('appId') + t.integer('parentApp') + t.string('name') + t.string('commit') + t.integer('releaseId') + t.integer('imageId') + t.string('image') + t.json('environment') + t.json('config') + }) + }) + .then(() => { + return knex.schema.createTable('dependentAppTarget', (t) => { + t.increments('id').primary() + t.integer('appId') + t.integer('parentApp') + t.string('name') + t.string('commit') + t.integer('releaseId') + t.integer('imageId') + t.string('image') + t.json('environment') + t.json('config') + }) + }) + .then(() => { + return Promise.map(dependentApps, (app) => { + let newApp = { + appId: parseInt(app.appId), + parentApp: parseInt(app.parentAppId), + image: app.imageId, + releaseId: null, + commit: app.commit, + name: app.name, + config: JSON.stringify(tryParse(app.config)), + environment: JSON.stringify(tryParse(app.environment)) + } + let image = imageForDependentApp(newApp) + return knex('image').insert(image) + .then(() => knex('dependentApp').insert(newApp)) + .then(() => knex('dependentAppTarget').insert(newApp)) + }) + }) + }) + .then(() => knex('dependentDevice').select()) + .then((dependentDevices) => { + return knex.schema.dropTable('dependentDevice') + .then(() => { + return knex.schema.createTable('dependentDevice', (t) => { + t.increments('id').primary() + t.string('uuid') + t.integer('appId') + t.string('localId') + t.string('device_type') + t.string('logs_channel') + t.integer('deviceId') + t.boolean('is_online') + t.string('name') + t.string('status') + t.string('download_progress') + t.integer('is_managed_by') + t.dateTime('lock_expiry_date') + t.string('commit') + t.string('targetCommit') + t.json('environment') + t.json('targetEnvironment') + t.json('config') + t.json('targetConfig') + t.boolean('markedForDeletion') + }) + }) + .then(() => { + return knex.schema.createTable('dependentDeviceTarget', (t) => { + t.increments('id').primary() + t.string('uuid') + t.string('name') + t.json('apps') + }) + }) + .then(() => { + return Promise.map(dependentDevices, (device) => { + let newDevice = Object.assign({}, device) + newDevice.appId = parseInt(device.appId) + newDevice.deviceId = parseInt(device.deviceId) + if (device.is_managed_by != null) { + newDevice.is_managed_by = parseInt(device.is_managed_by) + } + newDevice.config = JSON.stringify(tryParse(device.config)) + newDevice.environment = JSON.stringify(tryParse(device.environment)) + newDevice.targetConfig = JSON.stringify(tryParse(device.targetConfig)) + newDevice.targetEnvironment = JSON.stringify(tryParse(device.targetEnvironment)) + if (newDevice.markedForDeletion == null) { + newDevice.markedForDeletion = false + } + let deviceTarget = { + uuid: device.uuid, + name: device.name, + apps: {} + } + deviceTarget.apps[device.appId] = { + commit: newDevice.targetCommit, + config: newDevice.targetConfig, + environment: newDevice.targetEnvironment + } + return knex('dependentDevice').insert(newDevice) + .then(() => knex('dependentDeviceTarget').insert(deviceTarget)) + }) + }) + }) +} + +exports.down = function(knex, Promise) { + return Promise.try(() => { throw new Error('Not implemented') }) +} \ No newline at end of file diff --git a/src/proxyvisor.coffee b/src/proxyvisor.coffee index ac09405b..63e32cf2 100644 --- a/src/proxyvisor.coffee +++ b/src/proxyvisor.coffee @@ -4,6 +4,7 @@ express = require 'express' fs = Promise.promisifyAll require 'fs' { request } = require './lib/request' constants = require './lib/constants' +{ checkInt } = require './lib/validation' path = require 'path' mkdirp = Promise.promisify(require('mkdirp')) bodyParser = require 'body-parser' @@ -237,90 +238,90 @@ module.exports = class Proxyvisor @lastRequestForDevice = {} @_router = new ProxyvisorRouter(this) @router = @_router.router - @validActions = [ 'updateDependentTargets', 'sendDependentHooks', 'removeDependentApp' ] + @validActions = _.keys(@actionExecutors) + @actionExecutors = { + updateDependentTargets: (step) => + @config.getMany([ 'currentApiKey', 'apiTimeout' ]) + .then ({ currentApiKey, apiTimeout }) => + # - take each of the step.devices and update dependentDevice with it (targetCommit, targetEnvironment, targetConfig) + # - if update returns 0, then use APIBinder to fetch the device, then store it to the db + # - set markedForDeletion: true for devices that are not in the step.devices list + # - update dependentApp with step.app + Promise.map step.devices, (device) => + uuid = device.uuid + # Only consider one app per dependent device for now + appId = _(device.apps).keys().head() + targetCommit = device.apps[appId].commit + targetEnvironment = JSON.stringify(device.apps[appId].environment) + targetConfig = JSON.stringify(device.apps[appId].config) + @db.models('dependentDevice').update({ appId, targetEnvironment, targetConfig, targetCommit, name: device.name }).where({ uuid }) + .then (n) => + return if n != 0 + # If the device is not in the DB it means it was provisioned externally + # so we need to fetch it. + @apiBinder.fetchDevice(uuid, currentApiKey, apiTimeout) + .then (dev) => + deviceForDB = { + uuid: uuid + appId: appId + device_type: dev.device_type + deviceId: dev.id + is_online: dev.is_online + name: dev.name + status: dev.status + logs_channel: dev.logs_channel + targetCommit + targetConfig + targetEnvironment + } + @db.models('dependentDevice').insert(deviceForDB) + .then => + @db.models('dependentDevice').where({ appId: step.appId }).whereNotIn('uuid', _.map(step.devices, 'uuid')).update({ markedForDeletion: true }) + .then => + @normaliseDependentAppForDB(step.app) + .then (appForDB) => + @db.upsertModel('dependentApp', appForDB, { appId: step.appId }) + .then -> + cleanupTars(step.appId, step.app.commit) + + sendDependentHooks: (step) => + Promise.join( + @config.get('apiTimeout') + @getHookEndpoint(step.appId) + (apiTimeout, endpoint) => + Promise.mapSeries step.devices, (device) => + Promise.try => + if @lastRequestForDevice[device.uuid]? + diff = Date.now() - @lastRequestForDevice[device.uuid] + if diff < 30000 + Promise.delay(30001 - diff) + .then => + @lastRequestForDevice[device.uuid] = Date.now() + if device.markedForDeletion + @sendDeleteHook(device, apiTimeout, endpoint) + else + @sendUpdate(device, apiTimeout, endpoint) + ) + + removeDependentApp: (step) => + # find step.app and delete it from the DB + # find devices with step.appId and delete them from the DB + @db.transaction (trx) -> + trx('dependentApp').where({ appId: step.appId }).del() + .then -> + trx('dependentDevice').where({ appId: step.appId }).del() + .then -> + cleanupTars(step.appId) + + } bindToAPI: (apiBinder) => @apiBinder = apiBinder executeStepAction: (step) => Promise.try => - actions = { - updateDependentTargets: => - @config.getMany([ 'currentApiKey', 'apiTimeout' ]) - .then ({ currentApiKey, apiTimeout }) => - # - take each of the step.devices and update dependentDevice with it (targetCommit, targetEnvironment, targetConfig) - # - if update returns 0, then use APIBinder to fetch the device, then store it to the db - # - set markedForDeletion: true for devices that are not in the step.devices list - # - update dependentApp with step.app - Promise.map step.devices, (device) => - uuid = device.uuid - # Only consider one app per dependent device for now - appId = _(device.apps).keys().head() - targetCommit = device.apps[appId].commit - targetEnvironment = JSON.stringify(device.apps[appId].environment) - targetConfig = JSON.stringify(device.apps[appId].config) - @db.models('dependentDevice').update({ appId, targetEnvironment, targetConfig, targetCommit, name: device.name }).where({ uuid }) - .then (n) => - return if n != 0 - # If the device is not in the DB it means it was provisioned externally - # so we need to fetch it. - @apiBinder.fetchDevice(uuid, currentApiKey, apiTimeout) - .then (dev) => - deviceForDB = { - uuid: uuid - appId: appId - device_type: dev.device_type - deviceId: dev.id - is_online: dev.is_online - name: dev.name - status: dev.status - logs_channel: dev.logs_channel - targetCommit - targetConfig - targetEnvironment - } - @db.models('dependentDevice').insert(deviceForDB) - .then => - @db.models('dependentDevice').where({ appId: step.appId }).whereNotIn('uuid', _.map(step.devices, 'uuid')).update({ markedForDeletion: true }) - .then => - @normaliseDependentAppForDB(step.app) - .then (appForDB) => - @db.upsertModel('dependentApp', appForDB, { appId: step.appId }) - .then -> - cleanupTars(step.appId, step.app.commit) - - sendDependentHooks: => - Promise.join( - @config.get('apiTimeout') - @getHookEndpoint(step.appId) - (apiTimeout, endpoint) => - Promise.mapSeries step.devices, (device) => - Promise.try => - if @lastRequestForDevice[device.uuid]? - diff = Date.now() - @lastRequestForDevice[device.uuid] - if diff < 30000 - Promise.delay(30001 - diff) - .then => - @lastRequestForDevice[device.uuid] = Date.now() - if device.markedForDeletion - @sendDeleteHook(device, apiTimeout, endpoint) - else - @sendUpdate(device, apiTimeout, endpoint) - ) - - removeDependentApp: => - # find step.app and delete it from the DB - # find devices with step.appId and delete them from the DB - @db.transaction (trx) -> - trx('dependentApp').where({ appId: step.appId }).del() - .then -> - trx('dependentDevice').where({ appId: step.appId }).del() - .then -> - cleanupTars(step.appId) - - } - throw new Error("Invalid proxyvisor action #{step.action}") if !actions[step.action]? - actions[step.action]() + throw new Error("Invalid proxyvisor action #{step.action}") if !@actionsExecutors[step.action]? + @actionExecutors[step.action](step) getCurrentStates: => Promise.join( @@ -384,7 +385,7 @@ module.exports = class Proxyvisor if dependent?.apps? appsArray = _.map dependent.apps, (app, appId) -> appClone = _.clone(app) - appClone.appId = appId + appClone.appId = checkInt(appId) return appClone Promise.map(appsArray, @normaliseDependentAppForDB) .then (appsForDB) => @@ -433,7 +434,7 @@ module.exports = class Proxyvisor normaliseDependentDeviceFromDB: (device) -> Promise.try -> outDevice = _.clone(device) - _.forEach [ 'environment', 'config', 'targetEnvironment', 'targetConfig' ], (prop) -> + for prop in [ 'environment', 'config', 'targetEnvironment', 'targetConfig' ] outDevice[prop] = JSON.parse(device[prop]) return outDevice diff --git a/src/supervisor-api.coffee b/src/supervisor-api.coffee index 3d07149d..b9055324 100644 --- a/src/supervisor-api.coffee +++ b/src/supervisor-api.coffee @@ -1,4 +1,3 @@ -_ = require 'lodash' express = require 'express' bufferEq = require 'buffer-equal-constant-time' blink = require './lib/blink' @@ -52,7 +51,7 @@ module.exports = class SupervisorAPI .catch (err) -> res.status(503).send(err?.message or err or 'Unknown error') - _.forEach @routers, (router) => + for router in @routers @_api.use(router) listen: (allowedInterfaces, port, apiTimeout) => diff --git a/src/supervisor.coffee b/src/supervisor.coffee index 4995eff3..80d94b15 100644 --- a/src/supervisor.coffee +++ b/src/supervisor.coffee @@ -30,33 +30,14 @@ module.exports = class Supervisor extends EventEmitter @deviceState.applications.proxyvisor.bindToAPI(@apiBinder) @api = new SupervisorAPI({ @config, @eventTracker, routers: [ @apiBinder.router, @deviceState.router ] }) - normaliseState: => + init: => @db.init() .tap => @config.init() # Ensures uuid, deviceApiKey, apiSecret and logsChannel - .then (needsMigration) => - # We're updating from an older supervisor, so we need to mark images as supervised and remove all containers - if needsMigration - @db.models('legacyData').select() - .then ([ legacyData ]) => - if !legacyData? - console.log('No legacy data found, skipping migration') - return - @deviceState.normaliseLegacy(legacyData) - .then => - @db.finishMigration() - - init: => - @normaliseState() .then => @config.getMany(startupConfigFields) .then (conf) => - @eventTracker.init({ - offlineMode: conf.offlineMode - mixpanelToken: conf.mixpanelToken - mixpanelHost: conf.mixpanelHost - uuid: conf.uuid - }) + @eventTracker.init(conf) .then => @eventTracker.track('Supervisor start') @deviceState.init() diff --git a/webpack.config.js b/webpack.config.js index c440344c..9d77664b 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -4,6 +4,7 @@ var fs = require('fs'); var _ = require('lodash'); var path = require('path'); var UglifyPlugin = require("uglifyjs-webpack-plugin"); +const CopyWebpackPlugin = require('copy-webpack-plugin'); var externalModules = [ 'mkfifo', @@ -54,7 +55,17 @@ module.exports = function (env) { let plugins = [ new webpack.DefinePlugin({ 'process.env.NODE_ENV': '"production"', - }) + }), + new CopyWebpackPlugin([ + { + from: './src/migrations', + to: 'migrations' + } + ]), + new webpack.ContextReplacementPlugin( + /\.\/migrations/, + path.resolve(__dirname, 'src/migrations') + ) ] if (env == null || !env.noOptimize) { plugins.push(new UglifyPlugin()) @@ -69,8 +80,15 @@ module.exports = function (env) { extensions: [".js", ".json", ".coffee"] }, target: 'node', + node: { + __dirname: false + }, module: { rules: [ + { + test: /knex\/lib\/migrate\/index\.js$/, + use: require.resolve('./hardcode-migrations') + }, { test: /JSONStream\/index\.js$/, use: require.resolve('./fix-jsonstream')