diff --git a/package.json b/package.json index 4ef3f07a..5b5869da 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "memoizee": "^0.4.1", "mixpanel": "0.0.20", "mkdirp": "^0.5.1", + "ncp": "^2.0.0", "network-checker": "~0.0.5", "node-loader": "^0.6.0", "null-loader": "^0.1.1", diff --git a/src/api.coffee b/src/api.coffee deleted file mode 100644 index 7a25ad66..00000000 --- a/src/api.coffee +++ /dev/null @@ -1,257 +0,0 @@ -Promise = require 'bluebird' -utils = require './utils' -express = require 'express' -bodyParser = require 'body-parser' -bufferEq = require 'buffer-equal-constant-time' -config = require './config' -device = require './device' -_ = require 'lodash' -proxyvisor = require './proxyvisor' -hostConfig = require './host-config' - -module.exports = (application) -> - authenticate = (req, res, next) -> - queryKey = req.query.apikey - header = req.get('Authorization') ? '' - match = header.match(/^ApiKey (\w+)$/) - headerKey = match?[1] - utils.getOrGenerateSecret('api') - .then (secret) -> - if queryKey? && bufferEq(new Buffer(queryKey), new Buffer(secret)) - next() - else if headerKey? && bufferEq(new Buffer(headerKey), new Buffer(secret)) - next() - else if application.localMode - next() - else - res.sendStatus(401) - .catch (err) -> - # This should never happen... - res.status(503).send('Invalid API key in supervisor') - - api = express() - unparsedRouter = express.Router() - parsedRouter = express.Router() - unauthenticatedRouter = express.Router() - - parsedRouter.use(bodyParser()) - parsedRouter.use(authenticate) - unparsedRouter.use(authenticate) - - unparsedRouter.get '/ping', (req, res) -> - res.send('OK') - - unparsedRouter.post '/v1/blink', (req, res) -> - utils.mixpanelTrack('Device blink') - utils.blink.pattern.start() - setTimeout(utils.blink.pattern.stop, 15000) - res.sendStatus(200) - - parsedRouter.post '/v1/update', (req, res) -> - utils.mixpanelTrack('Update notification') - application.update(req.body.force) - res.sendStatus(204) - - parsedRouter.post '/v1/reboot', (req, res) -> - force = req.body.force - Promise.map utils.getKnexApps(), (theApp) -> - Promise.using application.lockUpdates(theApp.appId, force), -> - # There's a slight chance the app changed after the previous select - # So we fetch it again now the lock is acquired - utils.getKnexApp(theApp.appId) - .then (app) -> - application.kill(app, removeContainer: false) if app? - .then -> - application.logSystemMessage('Rebooting', {}, 'Reboot') - device.reboot() - .then (response) -> - res.status(202).json(response) - .catch (err) -> - if err instanceof application.UpdatesLockedError - status = 423 - else - status = 500 - res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' }) - - parsedRouter.post '/v1/shutdown', (req, res) -> - force = req.body.force - Promise.map utils.getKnexApps(), (theApp) -> - Promise.using application.lockUpdates(theApp.appId, force), -> - # There's a slight chance the app changed after the previous select - # So we fetch it again now the lock is acquired - utils.getKnexApp(theApp.appId) - .then (app) -> - application.kill(app, removeContainer: false) if app? - .then -> - application.logSystemMessage('Shutting down', {}, 'Shutdown') - device.shutdown() - .then (response) -> - res.status(202).json(response) - .catch (err) -> - if err instanceof application.UpdatesLockedError - status = 423 - else - status = 500 - res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' }) - - parsedRouter.post '/v1/purge', (req, res) -> - appId = req.body.appId - application.logSystemMessage('Purging /data', { appId }, 'Purge /data') - if !appId? - return res.status(400).send('Missing app id') - Promise.using application.lockUpdates(appId, true), -> - utils.getKnexApp(appId) - .then (app) -> - application.kill(app) - .then -> - new Promise (resolve, reject) -> - utils.gosuper.post('/v1/purge', { json: true, body: applicationId: appId }) - .on('error', reject) - .on('response', -> resolve()) - .pipe(res) - .then -> - application.logSystemMessage('Purged /data', { appId }, 'Purge /data success') - .finally -> - application.start(app) - .catch (err) -> - status = 503 - if err instanceof utils.AppNotFoundError - errMsg = "App not found: an app needs to be installed for purge to work. - If you've recently moved this device from another app, - please push an app and wait for it to be installed first." - err = new Error(errMsg) - status = 400 - application.logSystemMessage("Error purging /data: #{err}", { appId, error: err }, 'Purge /data error') - res.status(status).send(err?.message or err or 'Unknown error') - - unparsedRouter.post '/v1/tcp-ping', (req, res) -> - utils.disableCheck(false) - res.sendStatus(204) - - unparsedRouter.delete '/v1/tcp-ping', (req, res) -> - utils.disableCheck(true) - res.sendStatus(204) - - parsedRouter.post '/v1/restart', (req, res) -> - appId = req.body.appId - force = req.body.force - utils.mixpanelTrack('Restart container', appId) - if !appId? - return res.status(400).send('Missing app id') - Promise.using application.lockUpdates(appId, force), -> - utils.getKnexApp(appId) - .then (app) -> - application.kill(app) - .then -> - application.start(app) - .then -> - res.status(200).send('OK') - .catch utils.AppNotFoundError, (e) -> - return res.status(400).send(e.message) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - parsedRouter.post '/v1/apps/:appId/stop', (req, res) -> - { appId } = req.params - { force } = req.body - utils.mixpanelTrack('Stop container', appId) - if !appId? - return res.status(400).send('Missing app id') - Promise.using application.lockUpdates(appId, force), -> - utils.getKnexApp(appId) - .then (app) -> - application.getContainerId(app) - .then (containerId) -> - application.kill(app, removeContainer: false) - .then (app) -> - res.json({ containerId }) - .catch utils.AppNotFoundError, (e) -> - return res.status(400).send(e.message) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - unparsedRouter.post '/v1/apps/:appId/start', (req, res) -> - { appId } = req.params - utils.mixpanelTrack('Start container', appId) - if !appId? - return res.status(400).send('Missing app id') - Promise.using application.lockUpdates(appId), -> - utils.getKnexApp(appId) - .tap (app) -> - application.start(app) - .then -> - application.getContainerId(app) - .then (containerId) -> - res.json({ containerId }) - .catch utils.AppNotFoundError, (e) -> - return res.status(400).send(e.message) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - unparsedRouter.get '/v1/apps/:appId', (req, res) -> - { appId } = req.params - utils.mixpanelTrack('GET app', appId) - if !appId? - return res.status(400).send('Missing app id') - Promise.using application.lockUpdates(appId, true), -> - columns = [ 'appId', 'containerName', 'commit', 'imageId', 'env' ] - utils.getKnexApp(appId, columns) - .then (app) -> - application.getContainerId(app) - .then (containerId) -> - # Don't return keys on the endpoint - app.env = _.omit(JSON.parse(app.env), config.privateAppEnvVars) - app.containerId = containerId - # Don't return data that will be of no use to the user - res.json(_.omit(app, [ 'containerName' ])) - .catch utils.AppNotFoundError, (e) -> - return res.status(400).send(e.message) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - # Expires the supervisor's API key and generates a new one. - # It also communicates the new key to the Resin API. - unparsedRouter.post '/v1/regenerate-api-key', (req, res) -> - utils.newSecret('api') - .then (secret) -> - device.updateState(api_secret: secret) - res.status(200).send(secret) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - parsedRouter.get '/v1/device/host-config', (req, res) -> - hostConfig.get() - .then (conf) -> - res.json(conf) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - parsedRouter.patch '/v1/device/host-config', (req, res) -> - hostConfig.patch(req.body) - .then -> - res.status(200).send('OK') - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - unparsedRouter.get '/v1/device', (req, res) -> - res.json(device.getState()) - - unauthenticatedRouter.get '/v1/healthy', (req, res) -> - # Has the update cycle not hung? (unless we're downloading an image) - healthy = application.healthy() - # If we're connected and we know it, has the current state been reported? - healthy and= device.stateReportHealthy() - # As far as we know, is gosuper healthy? - healthy and= device.gosuperHealthy() - - if healthy - res.sendStatus(200) - else - res.sendStatus(500) - - api.use(unauthenticatedRouter) - api.use(unparsedRouter) - api.use(parsedRouter) - api.use(proxyvisor.router) - - return api diff --git a/src/app.coffee b/src/app.coffee index 2f98d772..84bcf392 100644 --- a/src/app.coffee +++ b/src/app.coffee @@ -1,71 +1,6 @@ require('log-timestamp') -process.on 'uncaughtException', (e) -> - console.error('Got unhandled exception', e, e?.stack) -Promise = require 'bluebird' -knex = require './db' -utils = require './utils' -bootstrap = require './bootstrap' -config = require './config' +Supervisor = require './supervisor' -knex.init.then -> - utils.mixpanelTrack('Supervisor start') - - console.log('Starting connectivity check..') - utils.connectivityCheck() - - Promise.join bootstrap.startBootstrapping(), utils.getOrGenerateSecret('api'), utils.getOrGenerateSecret('logsChannel'), (uuid, secret, logsChannel) -> - # Persist the uuid in subsequent metrics - utils.mixpanelProperties.uuid = uuid - utils.mixpanelProperties.distinct_id = uuid - - api = require './api' - application = require('./application')(logsChannel, bootstrap.offlineMode) - device = require './device' - - console.log('Starting API server..') - utils.createIpTablesRules() - .then -> - apiServer = api(application).listen(config.listenPort) - apiServer.timeout = config.apiTimeout - device.events.on 'shutdown', -> - apiServer.close() - bootstrap.done - .then -> - Promise.join( - device.getOSVersion() - device.getOSVariant() - (osVersion, osVariant) -> - # Let API know what version we are, and our api connection info. - console.log('Updating supervisor version and api info') - device.updateState( - api_port: config.listenPort - api_secret: secret - os_version: osVersion - os_variant: osVariant - supervisor_version: utils.supervisorVersion - provisioning_progress: null - provisioning_state: '' - download_progress: null - logs_channel: logsChannel - ) - ) - - updateIpAddr = -> - utils.gosuper.getAsync('/v1/ipaddr', { json: true }) - .spread (response, body) -> - if response.statusCode != 200 || !body.Data.IPAddresses? - throw new Error('Invalid response from gosuper') - device.reportHealthyGosuper() - device.updateState( - ip_address: body.Data.IPAddresses.join(' ') - ) - .catch -> - device.reportUnhealthyGosuper() - - console.log('Starting periodic check for IP addresses..') - setInterval(updateIpAddr, 30 * 1000) # Every 30s - updateIpAddr() - - console.log('Starting Apps..') - application.initialize() +supervisor = new Supervisor() +supervisor.init() diff --git a/src/application.coffee b/src/application.coffee deleted file mode 100644 index f872cbf1..00000000 --- a/src/application.coffee +++ /dev/null @@ -1,985 +0,0 @@ -_ = require 'lodash' -url = require 'url' -Lock = require 'rwlock' -knex = require './db' -config = require './config' -dockerUtils = require './docker-utils' -Promise = require 'bluebird' -utils = require './utils' -logger = require './lib/logger' -{ cachedResinApi } = require './request' -device = require './device' -lockFile = Promise.promisifyAll(require('lockfile')) -bootstrap = require './bootstrap' -TypedError = require 'typed-error' -fs = Promise.promisifyAll(require('fs')) -JSONStream = require 'JSONStream' -proxyvisor = require './proxyvisor' -{ checkInt, checkTruthy } = require './lib/validation' -osRelease = require './lib/os-release' -deviceConfig = require './device-config' -randomHexString = require './lib/random-hex-string' - -UPDATE_IDLE = 0 -UPDATE_UPDATING = 1 -UPDATE_REQUIRED = 2 -UPDATE_SCHEDULED = 3 - -updateStatus = - state: UPDATE_IDLE - failed: 0 - forceNext: false - intervalHandle: null - lastFullUpdateCycle: process.hrtime() - currentlyDownloading: false - timeSpentPulling: 0 - -class UpdatesLockedError extends TypedError -ImageNotFoundError = (err) -> - return "#{err.statusCode}" is '404' - -{ docker } = dockerUtils - -logTypes = - stopApp: - eventName: 'Application kill' - humanName: 'Killing application' - stopAppSuccess: - eventName: 'Application stop' - humanName: 'Killed application' - stopAppNoop: - eventName: 'Application already stopped' - humanName: 'Application is already stopped, removing container' - stopRemoveAppNoop: - eventName: 'Application already stopped and container removed' - humanName: 'Application is already stopped and the container removed' - stopAppError: - eventName: 'Application stop error' - humanName: 'Failed to kill application' - - downloadApp: - eventName: 'Application docker download' - humanName: 'Downloading application' - downloadAppDelta: - eventName: 'Application delta download' - humanName: 'Downloading delta for application' - downloadAppSuccess: - eventName: 'Application downloaded' - humanName: 'Downloaded application' - downloadAppError: - eventName: 'Application download error' - humanName: 'Failed to download application' - - installApp: - eventName: 'Application install' - humanName: 'Installing application' - installAppSuccess: - eventName: 'Application installed' - humanName: 'Installed application' - installAppError: - eventName: 'Application install error' - humanName: 'Failed to install application' - - deleteImageForApp: - eventName: 'Application image removal' - humanName: 'Deleting image for application' - deleteImageForAppSuccess: - eventName: 'Application image removed' - humanName: 'Deleted image for application' - deleteImageForAppError: - eventName: 'Application image removal error' - humanName: 'Failed to delete image for application' - imageAlreadyDeleted: - eventName: 'Image already deleted' - humanName: 'Image already deleted for application' - - startApp: - eventName: 'Application start' - humanName: 'Starting application' - startAppSuccess: - eventName: 'Application started' - humanName: 'Started application' - startAppNoop: - eventName: 'Application already running' - humanName: 'Application is already running' - startAppError: - eventName: 'Application start error' - humanName: 'Failed to start application' - - updateApp: - eventName: 'Application update' - humanName: 'Updating application' - updateAppError: - eventName: 'Application update error' - humanName: 'Failed to update application' - - appExit: - eventName: 'Application exit' - humanName: 'Application exited' - - appRestart: - eventName: 'Application restart' - humanName: 'Restarting application' - - updateAppConfig: - eventName: 'Application config update' - humanName: 'Updating config for application' - updateAppConfigSuccess: - eventName: 'Application config updated' - humanName: 'Updated config for application' - updateAppConfigError: - eventName: 'Application config update error' - humanName: 'Failed to update config for application' - -application = {} -application.UpdatesLockedError = UpdatesLockedError -application.localMode = false - -application.healthy = -> - timeSinceLastCycle = process.hrtime(updateStatus.lastFullUpdateCycle)[0] * 1000 - return bootstrap.offlineMode or updateStatus.currentlyDownloading or timeSinceLastCycle - updateStatus.timeSpentPulling <= 2 * config.appUpdatePollInterval - -application.logSystemMessage = logSystemMessage = (message, obj, eventName) -> - logger.log({ m: message, s: 1 }) - utils.mixpanelTrack(eventName ? message, obj) - -logSystemEvent = (logType, app = {}, error) -> - message = "#{logType.humanName} '#{app.imageId}'" - if error? - # Report the message from the original cause to the user. - errMessage = error.message - if _.isEmpty(errMessage) - errMessage = error.json - if _.isEmpty(errMessage) - errMessage = error.reason - if _.isEmpty(errMessage) - errMessage = 'Unknown cause' - message += " due to '#{errMessage}'" - logSystemMessage(message, { app, error }, logType.eventName) - return - -logSpecialAction = (action, value, success) -> - if success? - if success - if !value? - msg = "Cleared config variable #{action}" - else - msg = "Applied config variable #{action} = #{value}" - else - if !value? - msg = "Config variable #{action} not cleared yet" - else - msg = "Config variable #{action} = #{value} not applied yet" - else - if !value? - msg = "Clearing config variable #{action}" - else - msg = "Applying config variable #{action} = #{value}" - logSystemMessage(msg, {}, "Apply special action #{if success then "success" else "in progress"}") - -application.kill = kill = (app, { updateDB = true, removeContainer = true } = {}) -> - logSystemEvent(logTypes.stopApp, app) - device.updateState(status: 'Stopping') - container = docker.getContainer(app.containerName) - container.stop(t: 10) - .then -> - container.remove(v: true) if removeContainer - return - .catch (err) -> - # Get the statusCode from the original cause and make sure statusCode its definitely a string for comparison - # reasons. - statusCode = '' + err.statusCode - # 304 means the container was already stopped - so we can just remove it - if statusCode is '304' - logSystemEvent(logTypes.stopAppNoop, app) - container.remove(v: true) if removeContainer - return - # 404 means the container doesn't exist, precisely what we want! :D - if statusCode is '404' - logSystemEvent(logTypes.stopRemoveAppNoop, app) - return - throw err - .tap -> - lockFile.unlockAsync(tmpLockPath(app)) - .tap -> - device.isResinOSv1() - .then (isV1) -> - lockFile.unlockAsync(persistentLockPath(app)) if isV1 - .tap -> - logSystemEvent(logTypes.stopAppSuccess, app) - if removeContainer && updateDB - app.containerName = null - knex('app').update(app).where(appId: app.appId) - .catch (err) -> - logSystemEvent(logTypes.stopAppError, app, err) - throw err - -application.deleteImage = deleteImage = (app) -> - logSystemEvent(logTypes.deleteImageForApp, app) - docker.getImage(app.imageId).remove(force: true) - .then -> - logSystemEvent(logTypes.deleteImageForAppSuccess, app) - .catch ImageNotFoundError, (err) -> - logSystemEvent(logTypes.imageAlreadyDeleted, app) - .catch (err) -> - logSystemEvent(logTypes.deleteImageForAppError, app, err) - throw err - -isValidPort = (port) -> - maybePort = parseInt(port, 10) - return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535 - -fetch = (app, { deltaSource, setDeviceUpdateState = true } = {}) -> - onProgress = (progress) -> - device.updateState(download_progress: progress.percentage) - - startTime = process.hrtime() - docker.getImage(app.imageId).inspect() - .catch ImageNotFoundError, -> - updateStatus.currentlyDownloading = true - device.updateState(status: 'Downloading', download_progress: 0) - - Promise.try -> - conf = JSON.parse(app.config) - Promise.join utils.getConfig('apiKey'), utils.getConfig('uuid'), (apiKey, uuid) -> - if checkTruthy(conf['RESIN_SUPERVISOR_DELTA']) - logSystemEvent(logTypes.downloadAppDelta, app) - deltaOpts = { - uuid, apiKey, deltaSource - # use user-defined timeouts, but fallback to defaults if none is provided. - requestTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_REQUEST_TIMEOUT'], positive: true) - applyTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_APPLY_TIMEOUT'], positive: true) - retryCount: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_COUNT'], positive: true) - retryInterval: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL'], positive: true) - } - dockerUtils.rsyncImageWithProgress(app.imageId, deltaOpts, onProgress) - else - logSystemEvent(logTypes.downloadApp, app) - dockerUtils.fetchImageWithProgress(app.imageId, onProgress, { uuid, apiKey }) - .then -> - logSystemEvent(logTypes.downloadAppSuccess, app) - device.updateState(status: 'Idle', download_progress: null) - device.setUpdateState(update_downloaded: true) if setDeviceUpdateState - docker.getImage(app.imageId).inspect() - .catch (err) -> - logSystemEvent(logTypes.downloadAppError, app, err) - throw err - .finally -> - updateStatus.timeSpentPulling += process.hrtime(startTime)[0] - updateStatus.currentlyDownloading = false - -shouldMountKmod = (image) -> - device.isResinOSv1().then (isV1) -> - return false if not isV1 - Promise.using docker.imageRootDirMounted(image), (rootDir) -> - osRelease.getOSVersion(rootDir + '/etc/os-release') - .then (version) -> - return version? and /^(Debian|Raspbian)/i.test(version) - .catch (err) -> - console.error('Error getting app OS release: ', err) - return false - -isExecFormatError = (err) -> - message = '' - try - message = err.json.trim() - /exec format error$/.test(message) - -generateContainerName = (app) -> - randomHexString.generate() - .then (randomString) -> - sanitisedAppName = (app.name ? 'app').replace(/[^a-zA-Z0-9]+/g, '_') - return "#{sanitisedAppName}_#{randomString}" - -application.start = start = (app) -> - device.isResinOSv1().then (isV1) -> - volumes = utils.defaultVolumes(isV1) - binds = utils.defaultBinds(app.appId, isV1) - alreadyStarted = false - Promise.try -> - # Parse the env vars before trying to access them, that's because they have to be stringified for knex.. - return [ JSON.parse(app.env), JSON.parse(app.config) ] - .spread (env, conf) -> - if env.PORT? - portList = env.PORT - .split(',') - .map((port) -> port.trim()) - .filter(isValidPort) - - if app.containerName? - # If we have a container id then check it exists and if so use it. - container = docker.getContainer(app.containerName) - containerPromise = container.inspect().return(container) - else - containerPromise = Promise.rejected() - - # If there is no existing container then create one instead. - containerPromise.catch -> - # If the image is not available, we'll let the update cycle fix it - docker.getImage(app.imageId).inspect() - .then (imageInfo) -> - logSystemEvent(logTypes.installApp, app) - device.updateState(status: 'Installing') - generateContainerName(app) - .tap (name) -> - app.containerName = name - knex('app').update(app).where(appId: app.appId) - .then (affectedRows) -> - knex('app').insert(app) if affectedRows == 0 - .then (name) -> - ports = {} - portBindings = {} - if portList? - portList.forEach (port) -> - ports[port + '/tcp'] = {} - portBindings[port + '/tcp'] = [ HostPort: port ] - - if imageInfo?.Config?.Cmd - cmd = imageInfo.Config.Cmd - else - cmd = [ '/bin/bash', '-c', '/start' ] - - restartPolicy = createRestartPolicy({ name: conf['RESIN_APP_RESTART_POLICY'], maximumRetryCount: conf['RESIN_APP_RESTART_RETRIES'] }) - shouldMountKmod(app.imageId) - .then (shouldMount) -> - binds.push('/bin/kmod:/bin/kmod:ro') if shouldMount - docker.createContainer( - name: name - Image: app.imageId - Cmd: cmd - Tty: true - Volumes: volumes - Env: _.map env, (v, k) -> k + '=' + v - ExposedPorts: ports - HostConfig: - Privileged: true - NetworkMode: 'host' - PortBindings: portBindings - Binds: binds - RestartPolicy: restartPolicy - ) - .tap -> - logSystemEvent(logTypes.installAppSuccess, app) - .catch (err) -> - logSystemEvent(logTypes.installAppError, app, err) - throw err - .tap (container) -> - logSystemEvent(logTypes.startApp, app) - device.updateState(status: 'Starting') - container.start() - .catch (err) -> - statusCode = '' + err.statusCode - # 304 means the container was already started, precisely what we want :) - if statusCode is '304' - alreadyStarted = true - return - - if statusCode is '500' and isExecFormatError(err) - # Provide a friendlier error message for "exec format error" - device.getDeviceType() - .then (deviceType) -> - throw new Error("Application architecture incompatible with #{deviceType}: exec format error") - else - # rethrow the same error - throw err - .catch (err) -> - # If starting the container failed, we remove it so that it doesn't litter - container.remove(v: true) - .finally -> - throw err - .then -> - device.updateState(commit: app.commit) - logger.attach(app) - .catch (err) -> - # If creating or starting the app failed, we soft-delete it so that - # the next update cycle doesn't think the app is up to date - app.containerName = null - app.markedForDeletion = true - knex('app').update(app).where(appId: app.appId) - .finally -> - logSystemEvent(logTypes.startAppError, app, err) - throw err - .tap -> - if alreadyStarted - logSystemEvent(logTypes.startAppNoop, app) - else - logSystemEvent(logTypes.startAppSuccess, app) - .finally -> - device.updateState(status: 'Idle') - -validRestartPolicies = [ 'no', 'always', 'on-failure', 'unless-stopped' ] -# Construct a restart policy based on its name and maximumRetryCount. -# Both arguments are optional, and the default policy is "always". -# -# Throws exception if an invalid policy name is given. -# Returns a RestartPolicy { Name, MaximumRetryCount } object -createRestartPolicy = ({ name, maximumRetryCount }) -> - if not name? - name = 'always' - if not (name in validRestartPolicies) - throw new Error("Invalid restart policy: #{name}") - policy = { Name: name } - if name is 'on-failure' and maximumRetryCount? - policy.MaximumRetryCount = maximumRetryCount - return policy - -persistentLockPath = (app) -> - appId = app.appId ? app - return "/mnt/root#{config.dataPath}/#{appId}/resin-updates.lock" - -tmpLockPath = (app) -> - appId = app.appId ? app - return "/mnt/root/tmp/resin-supervisor/#{appId}/resin-updates.lock" - -killmePath = (app) -> - appId = app.appId ? app - return "/mnt/root#{config.dataPath}/#{appId}/resin-kill-me" - -# At boot, all apps should be unlocked *before* start to prevent a deadlock -application.unlockAndStart = unlockAndStart = (app) -> - lockFile.unlockAsync(persistentLockPath(app)) - .then -> - start(app) - -ENOENT = (err) -> err.code is 'ENOENT' - -application.lockUpdates = lockUpdates = do -> - _lock = new Lock() - _writeLock = Promise.promisify(_lock.async.writeLock) - return (app, force) -> - device.isResinOSv1() - .then (isV1) -> - persistentLockName = persistentLockPath(app) - tmpLockName = tmpLockPath(app) - _writeLock(tmpLockName) - .tap (release) -> - if isV1 and force != true - lockFile.lockAsync(persistentLockName) - .catch ENOENT, _.noop - .catch (err) -> - release() - throw new UpdatesLockedError("Updates are locked: #{err.message}") - .tap (release) -> - if force != true - lockFile.lockAsync(tmpLockName) - .catch ENOENT, _.noop - .catch (err) -> - Promise.try -> - lockFile.unlockAsync(persistentLockName) if isV1 - .finally -> - release() - throw new UpdatesLockedError("Updates are locked: #{err.message}") - .disposer (release) -> - Promise.try -> - lockFile.unlockAsync(tmpLockName) if force != true - .then -> - lockFile.unlockAsync(persistentLockName) if isV1 and force != true - .finally -> - release() - -joinErrorMessages = (failures) -> - s = if failures.length > 1 then 's' else '' - messages = _.map failures, (err) -> - err.message or err - "#{failures.length} error#{s}: #{messages.join(' - ')}" - -# Function to start the application update polling -application.poll = -> - updateStatus.intervalHandle = setInterval(-> - application.update() - , config.appUpdatePollInterval) - -# Callback function to set the API poll interval dynamically. -apiPollInterval = (val) -> - config.appUpdatePollInterval = checkInt(val, positive: true) ? 60000 - console.log('New API poll interval: ' + val) - clearInterval(updateStatus.intervalHandle) - application.poll() - return true - -setLocalMode = (val) -> - mode = checkTruthy(val) ? false - device.getOSVariant() - .then (variant) -> - if variant is not 'dev' - logSystemMessage('Not a development OS, ignoring local mode', {}, 'Ignore local mode') - return - Promise.try -> - if mode and !application.localMode - logSystemMessage('Entering local mode, app will be forcefully stopped', {}, 'Enter local mode') - Promise.map utils.getKnexApps(), (theApp) -> - Promise.using application.lockUpdates(theApp.appId, true), -> - # There's a slight chance the app changed after the previous select - # So we fetch it again now the lock is acquired - utils.getKnexApp(theApp.appId) - .then (app) -> - application.kill(app) if app? - else if !mode and application.localMode - logSystemMessage('Exiting local mode, app will be resumed', {}, 'Exit local mode') - Promise.map utils.getKnexApps(), (app) -> - unlockAndStart(app) - .then -> - application.localMode = mode - .return(true) - -specialActionConfigVars = [ - [ 'RESIN_SUPERVISOR_LOCAL_MODE', setLocalMode ] - [ 'RESIN_SUPERVISOR_VPN_CONTROL', utils.vpnControl ] - [ 'RESIN_SUPERVISOR_CONNECTIVITY_CHECK', utils.enableConnectivityCheck ] - [ 'RESIN_SUPERVISOR_POLL_INTERVAL', apiPollInterval ] - [ 'RESIN_SUPERVISOR_LOG_CONTROL', utils.resinLogControl ] -] -specialActionConfigKeys = _.map(specialActionConfigVars, 0) - -executedSpecialActionConfigVars = {} - -executeSpecialActionsAndHostConfig = (conf, oldConf, opts) -> - updatedValues = _.pickBy _.clone(oldConf), (val, key) -> - _.startsWith(key, device.hostConfigConfigVarPrefix) or _.includes(specialActionConfigKeys, key) - needsReboot = false - Promise.mapSeries specialActionConfigVars, ([ key, specialActionCallback ]) -> - # This makes the Special Action Envs only trigger their functions once. - if specialActionCallback? and executedSpecialActionConfigVars[key] != conf[key] - logSpecialAction(key, conf[key]) - Promise.try -> - specialActionCallback(conf[key], logSystemMessage, opts) - .then (updated) -> - if updated - updatedValues[key] = conf[key] - executedSpecialActionConfigVars[key] = conf[key] - logSpecialAction(key, conf[key], updated) - .then -> - hostConfigVars = _.pickBy conf, (val, key) -> - return _.startsWith(key, device.hostConfigConfigVarPrefix) - oldHostConfigVars = _.pickBy oldConf, (val, key) -> - return _.startsWith(key, device.hostConfigConfigVarPrefix) - if !_.isEqual(hostConfigVars, oldHostConfigVars) - device.setHostConfig(hostConfigVars, oldHostConfigVars, logSystemMessage) - .then (changedHostConfig) -> - needsReboot = changedHostConfig - if changedHostConfig - for own key of oldHostConfigVars - delete updatedValues[key] - _.assign(updatedValues, hostConfigVars) - .then -> - return { updatedValues, needsReboot } - -getAndApplyDeviceConfig = ({ initial = false } = {}) -> - deviceConfig.get() - .then ({ values, targetValues }) -> - executeSpecialActionsAndHostConfig(targetValues, values, { initial }) - .tap ({ updatedValues }) -> - deviceConfig.set({ values: updatedValues }) if !_.isEqual(values, updatedValues) - .then ({ needsReboot }) -> - if needsReboot - logSystemMessage('Rebooting', {}, 'Reboot') - Promise.delay(1000) - .then -> - device.reboot() - -wrapAsError = (err) -> - return err if _.isError(err) - return new Error(err.message ? err) - -# Wait for app to signal it's ready to die, or timeout to complete. -# timeout defaults to 1 minute. -waitToKill = (app, timeout) -> - startTime = Date.now() - pollInterval = 100 - timeout = checkInt(timeout, positive: true) ? 60000 - checkFileOrTimeout = -> - fs.statAsync(killmePath(app)) - .catch (err) -> - throw err unless (Date.now() - startTime) > timeout - .then -> - fs.unlinkAsync(killmePath(app)).catch(_.noop) - retryCheck = -> - checkFileOrTimeout() - .catch -> - Promise.delay(pollInterval).then(retryCheck) - retryCheck() - -updateStrategies = - 'download-then-kill': ({ localApp, app, needsDownload, force, deltaSource }) -> - Promise.try -> - fetch(app, { deltaSource }) if needsDownload - .then -> - Promise.using lockUpdates(localApp, force), -> - logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId - utils.getKnexApp(localApp.appId) - .then(kill) - .then -> - start(app) - .catch (err) -> - logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError - throw err - 'kill-then-download': ({ localApp, app, needsDownload, force, deltaSource }) -> - Promise.using lockUpdates(localApp, force), -> - logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId - utils.getKnexApp(localApp.appId) - .then(kill) - .then -> - fetch(app, { deltaSource }) if needsDownload - .then -> - start(app) - .catch (err) -> - logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError - throw err - 'delete-then-download': ({ localApp, app, needsDownload, force, deltaSource }) -> - Promise.using lockUpdates(localApp, force), -> - logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId - utils.getKnexApp(localApp.appId) - .tap(kill) - .then (appFromDB) -> - # If we don't need to download a new image, - # there's no use in deleting the image - if needsDownload - deleteImage(appFromDB) - .then -> - fetch(app, { deltaSource }) - .then -> - start(app) - .catch (err) -> - logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError - throw err - 'hand-over': ({ localApp, app, needsDownload, force, timeout, deltaSource }) -> - Promise.using lockUpdates(localApp, force), -> - utils.getKnexApp(localApp.appId) - .then (localApp) -> - Promise.try -> - fetch(app, { deltaSource }) if needsDownload - .then -> - logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId - start(app) - .then -> - waitToKill(localApp, timeout) - .then -> - kill(localApp, updateDB: false) - .catch (err) -> - logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError - throw err - - -updateUsingStrategy = (strategy, options) -> - if not _.has(updateStrategies, strategy) - strategy = 'download-then-kill' - updateStrategies[strategy](options) - -getRemoteState = (uuid, apiKey) -> - endpoint = url.resolve(config.apiEndpoint, "/device/v1/#{uuid}/state") - - requestParams = _.extend - method: 'GET' - url: "#{endpoint}?&apikey=#{apiKey}" - , cachedResinApi.passthrough - - cachedResinApi._request(requestParams) - .timeout(config.apiTimeout) - .catch (err) -> - console.error("Failed to get state for device #{uuid}. #{err}") - throw err - -# TODO: Actually store and use app.environment and app.config separately -parseEnvAndFormatRemoteApps = (remoteApps, uuid, deviceApiKey) -> - appsWithEnv = _.mapValues remoteApps, (app, appId) -> - utils.extendEnvVars(app.environment, uuid, deviceApiKey, appId, app.name, app.commit) - .then (env) -> - app.config ?= {} - return { - appId - commit: app.commit - imageId: app.image - env: JSON.stringify(env) - config: JSON.stringify(app.config) - name: app.name - markedForDeletion: false - } - Promise.props(appsWithEnv) - -formatLocalApps = (apps) -> - apps = _.keyBy(apps, 'appId') - localApps = _.mapValues apps, (app) -> - app = _.pick(app, [ 'appId', 'commit', 'imageId', 'env', 'config', 'name', 'markedForDeletion' ]) - app.markedForDeletion = checkTruthy(app.markedForDeletion) - return app - return localApps - -restartVars = (conf) -> - return _.pick(conf, [ 'RESIN_DEVICE_RESTART', 'RESIN_RESTART' ]) - -compareForUpdate = (localApps, remoteApps) -> - remoteAppIds = _.keys(remoteApps) - localAppIds = _.keys(localApps) - - toBeRemoved = _.difference(localAppIds, remoteAppIds) - toBeInstalled = _.difference(remoteAppIds, localAppIds) - - matchedAppIds = _.intersection(remoteAppIds, localAppIds) - toBeUpdated = _.filter matchedAppIds, (appId) -> - localApp = _.omit(localApps[appId], 'config') - remoteApp = _.omit(remoteApps[appId], 'config') - localApp.env = _.omit(JSON.parse(localApp.env), 'RESIN_DEVICE_NAME_AT_INIT') - remoteApp.env = _.omit(JSON.parse(remoteApp.env), 'RESIN_DEVICE_NAME_AT_INIT') - return !_.isEqual(remoteApp, localApp) or - !_.isEqual(restartVars(JSON.parse(localApps[appId].config)), restartVars(JSON.parse(remoteApps[appId].config))) - - appsWithUpdatedConfigs = _.filter matchedAppIds, (appId) -> - return !_.includes(toBeUpdated, appId) and - !_.isEqual(localApps[appId].config, remoteApps[appId].config) - - toBeDownloaded = _.filter toBeUpdated, (appId) -> - return !_.isEqual(remoteApps[appId].imageId, localApps[appId].imageId) - toBeDownloaded = _.union(toBeDownloaded, toBeInstalled) - - # The order in this _.union ensures that installations (new appIds) go last, so - # they will happen *after* removals. - allAppIds = _.union(localAppIds, remoteAppIds) - return { toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds } - -application.update = update = (force, scheduled = false) -> - switch updateStatus.state - when UPDATE_SCHEDULED - if scheduled isnt true - # There's an update scheduled but it isn't this one, so just stop - # but if we have to force an update, do it in the one that is scheduled - updateStatus.forceNext or= force - return - when UPDATE_IDLE - # All good, carry on with the update. - else - # Mark an update required after the current in-progress update. - updateStatus.forceNext or= force - updateStatus.state = UPDATE_REQUIRED - return - - force or= updateStatus.forceNext - updateStatus.forceNext = false - updateStatus.state = UPDATE_UPDATING - bootstrap.done.then -> - Promise.join utils.getConfig('apiKey'), utils.getConfig('uuid'), utils.getConfig('name'), knex('app').select(), (apiKey, uuid, deviceName, apps) -> - getRemoteState(uuid, apiKey) - .then ({ local, dependent }) -> - proxyvisor.fetchAndSetTargetsForDependentApps(dependent, fetch, apiKey) - .then -> - utils.setConfig('name', local.name) if local.name != deviceName - .then -> - utils.getConfig('deviceApiKey') - .then (deviceApiKey) -> - parseEnvAndFormatRemoteApps(local.apps, uuid, deviceApiKey) - .then (remoteApps) -> - localApps = formatLocalApps(apps) - resourcesForUpdate = compareForUpdate(localApps, remoteApps) - { toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds } = resourcesForUpdate - - if !_.isEmpty(toBeRemoved) or !_.isEmpty(toBeInstalled) or !_.isEmpty(toBeUpdated) - device.setUpdateState(update_pending: true) - deltaSource = {} - - Promise.map toBeDownloaded, (appId) -> - if _.includes(toBeUpdated, appId) - deltaSource[appId] = localApps[appId].imageId - else if !_.isEmpty(apps) # apps is localApps as an array - deltaSource[appId] = apps[0].imageId - .then -> - # Before running the updates, try to clean up any images that aren't in use - # and will not be used in the target state - return if application.localMode or device.shuttingDown - dockerUtils.cleanupContainersAndImages(_.map(remoteApps, 'imageId').concat(_.values(deltaSource))) - .catch (err) -> - console.log('Cleanup failed: ', err, err.stack) - .then -> - remoteDeviceConfig = {} - _.map remoteAppIds, (appId) -> - _.merge(remoteDeviceConfig, JSON.parse(remoteApps[appId].config)) - remoteDeviceHostConfig = _.pickBy(remoteDeviceConfig, (val, key) -> _.startsWith(key, device.hostConfigConfigVarPrefix)) - remoteSpecialActionValues = _.pickBy(remoteDeviceConfig, (val, key) -> _.includes(specialActionConfigKeys, key)) - # This prevents saving values like RESIN_SUPERVISOR_DELTA, that (for now) actually belong to app config, to the deviceConfig table - relevantDeviceConfig = _.assign({}, remoteDeviceHostConfig, remoteSpecialActionValues) - deviceConfig.get() - .then ({ values, targetValues }) -> - # If the new device config is different from the target values we had, or if - # for some reason it hasn't been applied yet (values don't match target), we apply it. - if !_.isEqual(targetValues, remoteDeviceConfig) or !_.isEqual(targetValues, values) or _.some(remoteSpecialActionValues, (v, k) -> executedSpecialActionConfigVars[k] != v) - deviceConfig.set({ targetValues: relevantDeviceConfig }) - .then -> - getAndApplyDeviceConfig() - .catch (err) -> - logSystemMessage("Error fetching/applying device configuration: #{err}", { error: err }, 'Set device configuration error') - .return(allAppIds) - .mapSeries (appId) -> - # The sorting of allAppIds from compareForUpdate ensures that toBeRemoved is - # done before toBeInstalled, which is the behavior we want. - return if application.localMode or device.shuttingDown - Promise.try -> - needsDownload = _.includes(toBeDownloaded, appId) - if _.includes(toBeRemoved, appId) - app = localApps[appId] - Promise.using lockUpdates(app, force), -> - # We get the app from the DB again in case someone restarted it - # (which would have changed its containerName) - utils.getKnexApp(appId) - .then(kill) - .then -> - knex('app').where('appId', appId).update(markedForDeletion: true) - .catch (err) -> - logSystemEvent(logTypes.updateAppError, app, err) - throw err - else if _.includes(toBeInstalled, appId) - app = remoteApps[appId] - Promise.try -> - fetch(app, { deltaSource: deltaSource[appId] }) if needsDownload - .then -> - start(app) - else if _.includes(toBeUpdated, appId) - app = remoteApps[appId] - conf = JSON.parse(app.config) - forceThisApp = - checkTruthy(conf['RESIN_SUPERVISOR_OVERRIDE_LOCK']) || - checkTruthy(conf['RESIN_OVERRIDE_LOCK']) - strategy = conf['RESIN_SUPERVISOR_UPDATE_STRATEGY'] - timeout = conf['RESIN_SUPERVISOR_HANDOVER_TIMEOUT'] - updateUsingStrategy strategy, { - localApp: localApps[appId] - app - needsDownload - force: force || forceThisApp - timeout - deltaSource: deltaSource[appId] - } - else if _.includes(appsWithUpdatedConfigs, appId) - # These apps have no changes other than config variables. - # It can notoriously affect setting dep. devices hook address - # if nothing else changes in the app. - # So we just save them. - app = remoteApps[appId] - logSystemEvent(logTypes.updateAppConfig, app) - knex('app').update(app).where({ appId }) - .then -> - logSystemEvent(logTypes.updateAppConfigSuccess, app) - .catch (err) -> - logSystemEvent(logTypes.updateAppConfigError, app, err) - throw err - .catch(wrapAsError) - .filter(_.isError) - .then (failures) -> - _.each(failures, (err) -> console.error('Error:', err, err.stack)) - throw new Error(joinErrorMessages(failures)) if failures.length > 0 - .then -> - proxyvisor.sendUpdates() if !device.shuttingDown - .then -> - return if application.localMode or device.shuttingDown - updateStatus.failed = 0 - device.setUpdateState(update_pending: false, update_downloaded: false, update_failed: false) - # We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a - # point where we might clean up an image we still want. - knex('app').where(markedForDeletion: true).del() - .then -> - dockerUtils.cleanupContainersAndImages() - .catch (err) -> - updateStatus.failed++ - device.setUpdateState(update_failed: true) - if updateStatus.state in [ UPDATE_REQUIRED, UPDATE_SCHEDULED ] - console.log('Updating failed, but there is already another update scheduled immediately: ', err) - return - delayTime = Math.min((2 ** updateStatus.failed) * 500, config.appUpdatePollInterval) - # If there was an error then schedule another attempt briefly in the future. - console.log('Scheduling another update attempt due to failure: ', delayTime, err) - setTimeout(update, delayTime, force, true) - updateStatus.state = UPDATE_SCHEDULED - .finally -> - switch updateStatus.state - when UPDATE_REQUIRED - # If an update is required then schedule it - setTimeout(update, 1, false, true) - updateStatus.state = UPDATE_SCHEDULED - when UPDATE_SCHEDULED - # Already scheduled, nothing to do here - else - updateStatus.state = UPDATE_IDLE - device.updateState(status: 'Idle') - updateStatus.lastFullUpdateCycle = process.hrtime() - updateStatus.timeSpentPulling = 0 - return - -sanitiseContainerName = (name) -> name.replace(/^\//, '') - -listenToEvents = do -> - appHasDied = {} - return -> - docker.getEvents() - .then (stream) -> - stream.on 'error', (err) -> - console.error('Error on docker events stream:', err, err.stack) - parser = JSONStream.parse() - parser.on 'error', (err) -> - console.error('Error on docker events JSON stream:', err, err.stack) - parser.on 'data', (data) -> - if data?.Type? && data.Type == 'container' && data.status in ['die', 'start'] && data.Actor?.Attributes?.Name? - knex('app').select().where({ containerName: sanitiseContainerName(data.Actor.Attributes.Name) }) - .then ([ app ]) -> - if app? - if data.status == 'die' - logSystemEvent(logTypes.appExit, app) - appHasDied[app.containerName] = true - else if data.status == 'start' and appHasDied[app.containerName] - logSystemEvent(logTypes.appRestart, app) - logger.attach(app) - .catch (err) -> - console.error('Error on docker event:', err, err.stack) - parser.on 'end', -> - console.error('Docker events stream ended, this should never happen') - listenToEvents() - stream.pipe(parser) - .catch (err) -> - console.error('Error listening to events:', err, err.stack) - -migrateContainerIdApps = -> - knex.schema.hasColumn('app', 'containerId') - .then (exists) -> - return if not exists - knex('app').whereNotNull('containerId').select() - .then (apps) -> - return if !apps? or apps.length == 0 - Promise.map apps, (app) -> - docker.getContainer(app.containerId).inspect() - .catchReturn({ Name: null }) - .then (container) -> - if container.Name? - app.containerName = sanitiseContainerName(container.Name) - else - app.containerName = null - app.containerId = null - knex('app').update(app).where({ id: app.id }) - .then -> - knex.schema.table 'app', (table) -> - table.dropColumn('containerId') - -application.getContainerId = (app) -> - docker.getContainer(app.containerName).inspect() - .catchReturn({ Id: null }) - .then (container) -> - return container.Id - -application.initialize = -> - listenToEvents() - migrateContainerIdApps() - .then -> - getAndApplyDeviceConfig(initial: true) - .then -> - knex('app').whereNot(markedForDeletion: true).orWhereNull('markedForDeletion').select() - .map (app) -> - unlockAndStart(app) if !application.localMode and !device.shuttingDown - .catch (error) -> - console.error('Error starting apps:', error) - .then -> - utils.mixpanelTrack('Start application update poll', { interval: config.appUpdatePollInterval }) - application.poll() - application.update() - -module.exports = (logsChannel, offlineMode) -> - logger.init( - pubnub: config.pubnub - channel: "device-#{logsChannel}-logs" - offlineMode: offlineMode - ) - return application diff --git a/src/bootstrap.coffee b/src/bootstrap.coffee deleted file mode 100644 index 14e09f38..00000000 --- a/src/bootstrap.coffee +++ /dev/null @@ -1,275 +0,0 @@ -Promise = require 'bluebird' -knex = require './db' -utils = require './utils' -deviceRegister = require 'resin-register-device' -{ resinApi, request } = require './request' -fs = Promise.promisifyAll(require('fs')) -config = require './config' -appsPath = '/boot/apps.json' -_ = require 'lodash' -deviceConfig = require './device-config' -TypedError = require 'typed-error' -osRelease = require './lib/os-release' -semver = require 'semver' -semverRegex = require('semver-regex') -configJson = require './config-json' - -DuplicateUuidError = (err) -> _.startsWith(err.message, '"uuid" must be unique') -exports.ExchangeKeyError = class ExchangeKeyError extends TypedError - -bootstrapper = {} - -loadPreloadedApps = -> - devConfig = {} - knex('app').select() - .then (apps) -> - if apps.length > 0 - console.log('Preloaded apps already loaded, skipping') - return - configJson.getAll() - .then (userConfig) -> - fs.readFileAsync(appsPath, 'utf8') - .then(JSON.parse) - .map (app) -> - utils.extendEnvVars(app.env, userConfig.uuid, userConfig.deviceApiKey, app.appId, app.name, app.commit) - .then (extendedEnv) -> - app.env = JSON.stringify(extendedEnv) - app.markedForDeletion = false - _.merge(devConfig, app.config) - app.config = JSON.stringify(app.config) - knex('app').insert(app) - .then -> - deviceConfig.set({ targetValues: devConfig }) - .catch (err) -> - utils.mixpanelTrack('Loading preloaded apps failed', { error: err }) - -fetchDevice = (uuid, apiKey) -> - resinApi.get - resource: 'device' - options: - filter: - uuid: uuid - customOptions: - apikey: apiKey - .get(0) - .catchReturn(null) - .timeout(config.apiTimeout) - -exchangeKey = -> - configJson.getAll() - .then (userConfig) -> - Promise.try -> - # If we have an existing device key we first check if it's valid, because if it is we can just use that - if userConfig.deviceApiKey? - fetchDevice(userConfig.uuid, userConfig.deviceApiKey) - .then (device) -> - if device? - return device - # If it's not valid/doesn't exist then we try to use the user/provisioning api key for the exchange - fetchDevice(userConfig.uuid, userConfig.apiKey) - .then (device) -> - if not device? - throw new ExchangeKeyError("Couldn't fetch device with provisioning key") - # We found the device, we can try to register a working device key for it - Promise.try -> - if !userConfig.deviceApiKey? - deviceApiKey = deviceRegister.generateUniqueKey() - configJson.set({ deviceApiKey }) - .return(deviceApiKey) - else - return userConfig.deviceApiKey - .then (deviceApiKey) -> - request.postAsync("#{config.apiEndpoint}/api-key/device/#{device.id}/device-key?apikey=#{userConfig.apiKey}", { - json: true - body: - apiKey: deviceApiKey - }) - .spread (res, body) -> - if res.statusCode != 200 - throw new ExchangeKeyError("Couldn't register device key with provisioning key") - .return(device) - -bootstrap = -> - configJson.get('deviceType') - .then (deviceType) -> - if !deviceType? - configJson.set(deviceType: 'raspberry-pi') - .then -> - configJson.getAll() - .then (userConfig) -> - if userConfig.registered_at? - return userConfig - - deviceRegister.register( - userId: userConfig.userId - applicationId: userConfig.applicationId - uuid: userConfig.uuid - deviceType: userConfig.deviceType - deviceApiKey: userConfig.deviceApiKey - provisioningApiKey: userConfig.apiKey - apiEndpoint: config.apiEndpoint - ) - .timeout(config.apiTimeout) - .catch DuplicateUuidError, -> - console.log('UUID already registered, trying a key exchange') - exchangeKey() - .tap -> - console.log('Key exchange succeeded, all good') - .tapCatch ExchangeKeyError, (err) -> - # If it fails we just have to reregister as a provisioning key doesn't have the ability to change existing devices - console.log('Exchanging key failed, having to reregister') - generateRegistration(true) - .then ({ id }) -> - toUpdate = {} - toDelete = [] - if !userConfig.registered_at? - toUpdate.registered_at = Date.now() - toUpdate.deviceId = id - osRelease.getOSVersion(config.hostOSVersionPath) - .then (osVersion) -> - # Delete the provisioning key now, only if the OS supports it - hasSupport = hasDeviceApiKeySupport(osVersion) - if hasSupport - toDelete.push('apiKey') - else - toUpdate.apiKey = userConfig.deviceApiKey - configJson.set(toUpdate, toDelete) - .then -> - configJson.getAll() - .then (userConfig) -> - console.log('Finishing bootstrapping') - knex('config').whereIn('key', ['uuid', 'apiKey', 'username', 'userId', 'version']).delete() - .then -> - knex('config').insert([ - { key: 'uuid', value: userConfig.uuid } - # We use the provisioning/user `apiKey` if it still exists because if it does it means we were already registered - # using that key and have to rely on the exchange key mechanism to swap the keys as appropriate later - { key: 'apiKey', value: userConfig.apiKey ? userConfig.deviceApiKey } - { key: 'deviceApiKey', value: userConfig.deviceApiKey } - { key: 'username', value: userConfig.username } - { key: 'userId', value: userConfig.userId } - { key: 'version', value: utils.supervisorVersion } - ]) - .tap -> - bootstrapper.doneBootstrapping() - -generateRegistration = (forceReregister = false) -> - Promise.try -> - if forceReregister - configJson.set({ uuid: deviceRegister.generateUniqueKey(), deviceApiKey: deviceRegister.generateUniqueKey() }) - else - configJson.getAll() - .then ({ uuid, deviceApiKey }) -> - uuid ?= deviceRegister.generateUniqueKey() - deviceApiKey ?= deviceRegister.generateUniqueKey() - configJson.set({ uuid, deviceApiKey }) - .then -> - configJson.get('uuid') - .catch (err) -> - console.log('Error generating and saving UUID: ', err) - Promise.delay(config.bootstrapRetryDelay) - .then -> - generateRegistration() - -bootstrapOrRetry = -> - utils.mixpanelTrack('Device bootstrap') - # If we're in offline mode, we don't start the provisioning process so bootstrap.done will never fulfill - return if bootstrapper.offlineMode - bootstrap().catch (err) -> - utils.mixpanelTrack('Device bootstrap failed, retrying', { error: err, delay: config.bootstrapRetryDelay }) - setTimeout(bootstrapOrRetry, config.bootstrapRetryDelay) - -hasDeviceApiKeySupport = (osVersion) -> - try - !/^Resin OS /.test(osVersion) or semver.gte(semverRegex().exec(osVersion)[0], '2.0.2') - catch err - console.error('Unable to determine if device has deviceApiKey support', err, err.stack) - false - -exchangeKeyAndUpdateConfig = -> - # Only do a key exchange and delete the provisioning key if we're on a Resin OS version - # that supports using the deviceApiKey (2.0.2 and above) - # or if we're in a non-Resin OS (which is assumed to be updated enough). - # Otherwise VPN and other host services that use an API key will break. - # - # In other cases, we make the apiKey equal the deviceApiKey instead. - Promise.join( - configJson.getAll() - osRelease.getOSVersion(config.hostOSVersionPath) - (userConfig, osVersion) -> - hasSupport = hasDeviceApiKeySupport(osVersion) - if hasSupport or userConfig.apiKey != userConfig.deviceApiKey - console.log('Attempting key exchange') - exchangeKey() - .then -> - configJson.get('deviceApiKey') - .then (deviceApiKey) -> - console.log('Key exchange succeeded, starting to use deviceApiKey') - utils.setConfig('deviceApiKey', deviceApiKey) - .then -> - utils.setConfig('apiKey', deviceApiKey) - .then -> - if hasSupport - configJson.set({}, [ 'apiKey' ]) - else - configJson.set(apiKey: deviceApiKey) - ) - -exchangeKeyOrRetry = do -> - _failedExchanges = 0 - return -> - exchangeKeyAndUpdateConfig() - .catch (err) -> - console.error('Error exchanging API key, will retry', err, err.stack) - delay = Math.min((2 ** _failedExchanges) * config.bootstrapRetryDelay, 24 * 60 * 60 * 1000) - _failedExchanges += 1 - setTimeout(exchangeKeyOrRetry, delay) - return - -bootstrapper.done = new Promise (resolve) -> - bootstrapper.doneBootstrapping = -> - configJson.getAll() - .then (userConfig) -> - bootstrapper.bootstrapped = true - resolve(userConfig) - # If we're still using an old api key we can try to exchange it for a valid device key - # This will only be the case when the supervisor/OS has been updated. - if userConfig.apiKey? - exchangeKeyOrRetry() - else - Promise.join( - knex('config').select('value').where(key: 'apiKey') - knex('config').select('value').where(key: 'deviceApiKey') - ([ apiKey ], [ deviceApiKey ]) -> - if !deviceApiKey?.value - # apiKey in the DB is actually the deviceApiKey, but it was - # exchanged in a supervisor version that didn't save it to the DB - # (which mainly affects the RESIN_API_KEY env var) - knex('config').insert({ key: 'deviceApiKey', value: apiKey.value }) - ) - return - -bootstrapper.bootstrapped = false -bootstrapper.startBootstrapping = -> - # Load config file - configJson.init() - .then -> - configJson.getAll() - .then (userConfig) -> - bootstrapper.offlineMode = !Boolean(config.apiEndpoint) or Boolean(userConfig.supervisorOfflineMode) - knex('config').select('value').where(key: 'uuid') - .then ([ uuid ]) -> - if uuid?.value - bootstrapper.doneBootstrapping() if !bootstrapper.offlineMode - return uuid.value - console.log('New device detected. Bootstrapping..') - - generateRegistration() - .tap -> - loadPreloadedApps() - .tap (uuid) -> - bootstrapOrRetry() if !bootstrapper.offlineMode - # Don't wait on bootstrapping here, bootstrapper.done is for that. - return - -module.exports = bootstrapper diff --git a/src/config-json.coffee b/src/config-json.coffee deleted file mode 100644 index b1c878b1..00000000 --- a/src/config-json.coffee +++ /dev/null @@ -1,46 +0,0 @@ -Promise = require 'bluebird' -_ = require 'lodash' -configPath = '/boot/config.json' -Lock = require 'rwlock' -fs = Promise.promisifyAll(require('fs')) -{ writeAndSyncFile } = require './lib/fs-utils' - -lock = new Lock() -writeLock = Promise.promisify(lock.async.writeLock) -readLock = Promise.promisify(lock.async.readLock) -withWriteLock = do -> - _takeLock = -> - writeLock('config') - .disposer (release) -> - release() - return (func) -> - Promise.using(_takeLock(), func) -withReadLock = do -> - _takeLock = -> - readLock('config') - .disposer (release) -> - release() - return (func) -> - Promise.using(_takeLock(), func) - -# write-through cache of the config.json file -userConfig = null -exports.init = -> - fs.readFileAsync(configPath) - .then (conf) -> - userConfig = JSON.parse(conf) - -exports.get = (key) -> - withReadLock -> - return userConfig[key] - -exports.getAll = -> - withReadLock -> - return _.clone(userConfig) - -exports.set = (vals = {}, keysToDelete = []) -> - withWriteLock -> - _.merge(userConfig, vals) - for key in keysToDelete - delete userConfig[key] - writeAndSyncFile(configPath, JSON.stringify(userConfig)) diff --git a/src/device.coffee b/src/device.coffee deleted file mode 100644 index aa99678e..00000000 --- a/src/device.coffee +++ /dev/null @@ -1,271 +0,0 @@ -_ = require 'lodash' -Promise = require 'bluebird' -memoizee = require 'memoizee' -utils = require './utils' -{ resinApi } = require './request' -device = exports -config = require './config' -configPath = '/boot/config.json' -execAsync = Promise.promisify(require('child_process').exec) -fs = Promise.promisifyAll(require('fs')) -bootstrap = require './bootstrap' -{ checkTruthy } = require './lib/validation' -osRelease = require './lib/os-release' -EventEmitter = require 'events' - -# If we don't use promise: 'then', exceptions will crash the program -memoizePromise = _.partial(memoizee, _, promise: 'then') - - -exports.getID = memoizePromise -> - bootstrap.done - .then -> - Promise.all([ - utils.getConfig('apiKey') - utils.getConfig('uuid') - ]) - .spread (apiKey, uuid) -> - resinApi.get( - resource: 'device' - options: - select: 'id' - filter: - uuid: uuid - customOptions: - apikey: apiKey - ) - .timeout(config.apiTimeout) - .then (devices) -> - if devices.length is 0 - throw new Error('Could not find this device?!') - return devices[0].id - -exports.shuttingDown = false -exports.events = new EventEmitter() -exports.reboot = -> - utils.gosuper.postAsync('/v1/reboot', { json: true }) - .spread (res, body) -> - if res.statusCode != 202 - throw new Error(body.Error) - exports.shuttingDown = true - exports.events.emit('shutdown') - return body - -exports.shutdown = -> - utils.gosuper.postAsync('/v1/shutdown', { json: true }) - .spread (res, body) -> - if res.statusCode != 202 - throw new Error(body.Error) - exports.shuttingDown = true - exports.events.emit('shutdown') - return body - -exports.hostConfigConfigVarPrefix = 'RESIN_HOST_' -bootConfigEnvVarPrefix = 'RESIN_HOST_CONFIG_' -bootBlockDevice = '/dev/mmcblk0p1' -bootMountPoint = '/mnt/root' + config.bootMountPoint -bootConfigPath = bootMountPoint + '/config.txt' -configRegex = new RegExp('(' + _.escapeRegExp(bootConfigEnvVarPrefix) + ')(.+)') -forbiddenConfigKeys = [ - 'disable_commandline_tags' - 'cmdline' - 'kernel' - 'kernel_address' - 'kernel_old' - 'ramfsfile' - 'ramfsaddr' - 'initramfs' - 'device_tree_address' - 'init_uart_baud' - 'init_uart_clock' - 'init_emmc_clock' - 'boot_delay' - 'boot_delay_ms' - 'avoid_safe_mode' -] -parseBootConfigFromEnv = (env) -> - # We ensure env doesn't have garbage - parsedEnv = _.pickBy env, (val, key) -> - return _.startsWith(key, bootConfigEnvVarPrefix) - parsedEnv = _.mapKeys parsedEnv, (val, key) -> - key.replace(configRegex, '$2') - parsedEnv = _.omit(parsedEnv, forbiddenConfigKeys) - return parsedEnv - -exports.setHostConfig = (env, oldEnv, logMessage) -> - Promise.join setBootConfig(env, oldEnv, logMessage), setLogToDisplay(env, oldEnv, logMessage), (bootConfigApplied, logToDisplayChanged) -> - return (bootConfigApplied or logToDisplayChanged) - -setLogToDisplay = (env, oldEnv, logMessage) -> - if env['RESIN_HOST_LOG_TO_DISPLAY']? - enable = checkTruthy(env['RESIN_HOST_LOG_TO_DISPLAY']) ? true - utils.gosuper.postAsync('/v1/set-log-to-display', { json: true, body: Enable: enable }) - .spread (response, body) -> - if response.statusCode != 200 - logMessage("Error setting log to display: #{body.Error}, Status:, #{response.statusCode}", { error: body.Error }, 'Set log to display error') - return false - else - if body.Data == true - logMessage("#{if enable then 'Enabled' else 'Disabled'} logs to display") - return body.Data - .catch (err) -> - logMessage("Error setting log to display: #{err}", { error: err }, 'Set log to display error') - return false - else - return Promise.resolve(false) - -setBootConfig = (env, oldEnv, logMessage) -> - device.getDeviceType() - .then (deviceType) -> - throw new Error('This is not a Raspberry Pi') if !_.startsWith(deviceType, 'raspberry') - Promise.join parseBootConfigFromEnv(env), parseBootConfigFromEnv(oldEnv), fs.readFileAsync(bootConfigPath, 'utf8'), (configFromApp, oldConfigFromApp, configTxt ) -> - throw new Error('No boot config to change') if _.isEqual(configFromApp, oldConfigFromApp) - configFromFS = {} - configPositions = [] - configStatements = configTxt.split(/\r?\n/) - _.each configStatements, (configStr) -> - keyValue = /^([^#=]+)=(.+)/.exec(configStr) - if keyValue? - configPositions.push(keyValue[1]) - configFromFS[keyValue[1]] = keyValue[2] - else - # This will ensure config.txt filters are in order - configPositions.push(configStr) - # configFromApp and configFromFS now have compatible formats - keysFromApp = _.keys(configFromApp) - keysFromOldConf = _.keys(oldConfigFromApp) - keysFromFS = _.keys(configFromFS) - toBeAdded = _.difference(keysFromApp, keysFromFS) - toBeDeleted = _.difference(keysFromOldConf, keysFromApp) - toBeChanged = _.intersection(keysFromApp, keysFromFS) - toBeChanged = _.filter toBeChanged, (key) -> - configFromApp[key] != configFromFS[key] - throw new Error('Nothing to change') if _.isEmpty(toBeChanged) and _.isEmpty(toBeAdded) and _.isEmpty(toBeDeleted) - - logMessage("Applying boot config: #{JSON.stringify(configFromApp)}", {}, 'Apply boot config in progress') - # We add the keys to be added first so they are out of any filters - outputConfig = _.map toBeAdded, (key) -> "#{key}=#{configFromApp[key]}" - outputConfig = outputConfig.concat _.map configPositions, (key, index) -> - configStatement = null - if _.includes(toBeChanged, key) - configStatement = "#{key}=#{configFromApp[key]}" - else if !_.includes(toBeDeleted, key) - configStatement = configStatements[index] - return configStatement - # Here's the dangerous part: - execAsync("mount -t vfat -o remount,rw #{bootBlockDevice} #{bootMountPoint}") - .then -> - fs.writeFileAsync(bootConfigPath + '.new', _.reject(outputConfig, _.isNil).join('\n')) - .then -> - fs.renameAsync(bootConfigPath + '.new', bootConfigPath) - .then -> - execAsync('sync') - .then -> - logMessage("Applied boot config: #{JSON.stringify(configFromApp)}", {}, 'Apply boot config success') - return true - .catch (err) -> - logMessage("Error setting boot config: #{err}", { error: err }, 'Apply boot config error') - throw err - .catch (err) -> - console.log('Will not set boot config: ', err) - return false - -exports.getDeviceType = memoizePromise -> - fs.readFileAsync(configPath, 'utf8') - .then(JSON.parse) - .then (configFromFile) -> - if !configFromFile.deviceType? - throw new Error('Device type not specified in config file') - return configFromFile.deviceType - -do -> - APPLY_STATE_SUCCESS_DELAY = 1000 - APPLY_STATE_RETRY_DELAY = 5000 - applyPending = false - targetState = {} - actualState = {} - updateState = { update_pending: false, update_failed: false, update_downloaded: false } - reportErrors = 0 - - exports.stateReportHealthy = -> - return !(utils.isConnectivityCheckEnabled() and utils.connected() and reportErrors > 3) - - getStateDiff = -> - _.omitBy targetState, (value, key) -> - actualState[key] is value - - applyState = -> - stateDiff = getStateDiff() - if _.size(stateDiff) is 0 - applyPending = false - return - applyPending = true - Promise.join( - utils.getConfig('apiKey') - device.getID() - (apiKey, deviceID) -> - stateDiff = getStateDiff() - if _.size(stateDiff) is 0 || !apiKey? - return - resinApi.patch - resource: 'device' - id: deviceID - body: stateDiff - customOptions: - apikey: apiKey - .timeout(config.apiTimeout) - .then -> - reportErrors = 0 - # Update the actual state. - _.merge(actualState, stateDiff) - ) - .delay(APPLY_STATE_SUCCESS_DELAY) - .catch (error) -> - reportErrors += 1 - utils.mixpanelTrack('Device info update failure', { error, stateDiff }) - # Delay 5s before retrying a failed update - Promise.delay(APPLY_STATE_RETRY_DELAY) - .finally -> - # Check if any more state diffs have appeared whilst we've been processing this update. - setImmediate(applyState) - - exports.setUpdateState = (value) -> - _.merge(updateState, value) - - exports.getState = -> - fieldsToOmit = ['api_secret', 'logs_channel', 'provisioning_progress', 'provisioning_state'] - state = _.omit(targetState, fieldsToOmit) - _.merge(state, updateState) - return state - - # Calling this function updates the local device state, which is then used to synchronise - # the remote device state, repeating any failed updates until successfully synchronised. - # This function will also optimise updates by merging multiple updates and only sending the latest state. - exports.updateState = (updatedState = {}, retry = false) -> - # Remove any updates that match the last we successfully sent. - _.merge(targetState, updatedState) - - # Only trigger applying state if an apply isn't already in progress. - if !applyPending - applyState() - return - -exports.getOSVersion = memoizePromise -> - osRelease.getOSVersion(config.hostOSVersionPath) - -exports.isResinOSv1 = memoizePromise -> - exports.getOSVersion().then (osVersion) -> - return true if /^Resin OS 1./.test(osVersion) - return false - -exports.getOSVariant = memoizePromise -> - osRelease.getOSVariant(config.hostOSVersionPath) - -do -> - _gosuperHealthy = true - exports.gosuperHealthy = -> - return _gosuperHealthy - exports.reportUnhealthyGosuper = -> - _gosuperHealthy = false - exports.reportHealthyGosuper = -> - _gosuperHealthy = true diff --git a/src/docker-utils.coffee b/src/docker-utils.coffee deleted file mode 100644 index b6b958c3..00000000 --- a/src/docker-utils.coffee +++ /dev/null @@ -1,223 +0,0 @@ -config = require './config' -process.env.DOCKER_HOST ?= "unix://#{config.dockerSocket}" - -Docker = require 'docker-toolbelt' -{ DockerProgress } = require 'docker-progress' -Promise = require 'bluebird' -dockerDelta = require 'docker-delta' - -_ = require 'lodash' -knex = require './db' -{ request, resumable } = require './request' -Lock = require 'rwlock' - -exports.docker = docker = new Docker() -dockerProgress = new DockerProgress(dockerToolbelt: docker) - -getRepoAndTag = (image) -> - docker.getRegistryAndName(image) - .then ({ registry, imageName, tagName }) -> - if registry? and registry != 'docker.io' - registry = registry.toString().replace(':443', '') + '/' - else - registry = '' - return { repo: "#{registry}#{imageName}", tag: tagName } - -applyDelta = (imgSrc, deltaUrl, applyTimeout, opts, onProgress) -> - new Promise (resolve, reject) -> - req = resumable(Object.assign({ url: deltaUrl }, opts)) - .on('progress', onProgress) - .on('retry', onProgress) - .on('error', reject) - .on 'response', (res) -> - if res.statusCode isnt 200 - reject(new Error("Got #{res.statusCode} when requesting delta from storage.")) - else if parseInt(res.headers['content-length']) is 0 - reject(new Error('Invalid delta URL.')) - else - deltaStream = dockerDelta.applyDelta(imgSrc, timeout: applyTimeout) - res.pipe(deltaStream) - .on('id', resolve) - .on('error', req.abort.bind(req)) - -do -> - _lock = new Lock() - _writeLock = Promise.promisify(_lock.async.writeLock) - _readLock = Promise.promisify(_lock.async.readLock) - writeLockImages = -> - _writeLock('images') - .disposer (release) -> - release() - readLockImages = -> - _readLock('images') - .disposer (release) -> - release() - - exports.rsyncImageWithProgress = (imgDest, opts, onProgress) -> - { requestTimeout, applyTimeout, retryCount, retryInterval, uuid, apiKey, deltaSource, startFromEmpty = false } = opts - Promise.using readLockImages(), -> - Promise.try -> - if startFromEmpty or !deltaSource? - return 'resin/scratch' - else - docker.getImage(deltaSource).inspect() - .then -> - return deltaSource - .catch -> - return 'resin/scratch' - .then (imgSrc) -> - # I'll leave this debug log here in case we ever wonder what delta source a device is using in production - console.log("Using delta source #{imgSrc}") - Promise.join docker.getRegistryAndName(imgDest), docker.getRegistryAndName(imgSrc), (dstInfo, srcInfo) -> - tokenEndpoint = "#{config.apiEndpoint}/auth/v1/token" - opts = - auth: - user: 'd_' + uuid - pass: apiKey - sendImmediately: true - json: true - timeout: requestTimeout - url = "#{tokenEndpoint}?service=#{dstInfo.registry}&scope=repository:#{dstInfo.imageName}:pull&scope=repository:#{srcInfo.imageName}:pull" - request.getAsync(url, opts) - .get(1) - .then (b) -> - opts = - followRedirect: false - timeout: requestTimeout - - if b?.token? - opts.auth = - bearer: b.token - sendImmediately: true - new Promise (resolve, reject) -> - request.get("#{config.deltaHost}/api/v2/delta?src=#{imgSrc}&dest=#{imgDest}", opts) - .on 'response', (res) -> - res.resume() # discard response body -- we only care about response headers - if res.statusCode in [ 502, 504 ] - reject(new Error('Delta server is still processing the delta, will retry')) - else if not (300 <= res.statusCode < 400 and res.headers['location']?) - reject(new Error("Got #{res.statusCode} when requesting image from delta server.")) - else - deltaUrl = res.headers['location'] - if imgSrc is 'resin/scratch' - deltaSrc = null - else - deltaSrc = imgSrc - resumeOpts = { timeout: requestTimeout, maxRetries: retryCount, retryInterval } - resolve(applyDelta(deltaSrc, deltaUrl, applyTimeout, resumeOpts, onProgress)) - .on 'error', reject - .then (id) -> - getRepoAndTag(imgDest) - .then ({ repo, tag }) -> - docker.getImage(id).tag({ repo, tag, force: true }) - .catch dockerDelta.OutOfSyncError, (err) -> - throw err if startFromEmpty - console.log('Falling back to delta-from-empty') - opts.startFromEmpty = true - exports.rsyncImageWithProgress(imgDest, opts, onProgress) - - exports.fetchImageWithProgress = (image, onProgress, { uuid, apiKey }) -> - Promise.using readLockImages(), -> - docker.getRegistryAndName(image) - .then ({ registry, imageName, tagName }) -> - dockerOptions = - authconfig: - username: 'd_' + uuid, - password: apiKey, - serveraddress: registry - dockerProgress.pull(image, onProgress, dockerOptions) - - normalizeRepoTag = (image) -> - getRepoAndTag(image) - .then ({ repo, tag }) -> - buildRepoTag(repo, tag) - - supervisorTagPromise = normalizeRepoTag(config.supervisorImage) - - exports.cleanupContainersAndImages = (extraImagesToIgnore = []) -> - Promise.using writeLockImages(), -> - Promise.join( - knex('app').select() - .map (app) -> - app.imageId = normalizeRepoTag(app.imageId) - return Promise.props(app) - knex('dependentApp').select().whereNotNull('imageId') - .map ({ imageId }) -> - return normalizeRepoTag(imageId) - supervisorTagPromise - docker.listImages() - .map (image) -> - image.NormalizedRepoTags = Promise.map(image.RepoTags, normalizeRepoTag) - return Promise.props(image) - Promise.map(extraImagesToIgnore, normalizeRepoTag) - (apps, dependentApps, supervisorTag, images, normalizedExtraImages) -> - appNames = _.map(apps, 'containerName') - appImages = _.map(apps, 'imageId') - imageTags = _.map(images, 'NormalizedRepoTags') - appTags = _.filter imageTags, (tags) -> - _.some tags, (tag) -> - _.includes(appImages, tag) or _.includes(dependentApps, tag) - appTags = _.flatten(appTags) - supervisorTags = _.filter imageTags, (tags) -> - _.includes(tags, supervisorTag) - supervisorTags = _.flatten(supervisorTags) - extraTags = _.filter imageTags, (tags) -> - _.some(tags, (tag) -> _.includes(normalizedExtraImages, tag)) - extraTags = _.flatten(extraTags) - allProtectedTags = _.union(appTags, supervisorTags, extraTags) - # Cleanup containers first, so that they don't block image removal. - docker.listContainers(all: true) - .filter (containerInfo) -> - # Do not remove user apps. - normalizeRepoTag(containerInfo.Image) - .then (repoTag) -> - if _.includes(appTags, repoTag) - return !_.some containerInfo.Names, (name) -> - _.some appNames, (appContainerName) -> "/#{appContainerName}" == name - if _.includes(supervisorTags, repoTag) - return containerHasExited(containerInfo.Id) - return true - .map (containerInfo) -> - docker.getContainer(containerInfo.Id).remove(v: true, force: true) - .then -> - console.log('Deleted container:', containerInfo.Id, containerInfo.Image) - .catch(_.noop) - .then -> - imagesToClean = _.reject images, (image) -> - _.some(image.NormalizedRepoTags, (tag) -> _.includes(allProtectedTags, tag)) - Promise.map imagesToClean, (image) -> - Promise.map image.RepoTags.concat(image.Id), (tag) -> - docker.getImage(tag).remove(force: true) - .then -> - console.log('Deleted image:', tag, image.Id, image.RepoTags) - .catch(_.noop) - ) - - containerHasExited = (id) -> - docker.getContainer(id).inspect() - .then (data) -> - return not data.State.Running - - buildRepoTag = (repo, tag, registry) -> - repoTag = '' - if registry? - repoTag += registry + '/' - repoTag += repo - if tag? - repoTag += ':' + tag - else - repoTag += ':latest' - return repoTag - - exports.getImageEnv = (id) -> - docker.getImage(id).inspect() - .get('Config').get('Env') - .then (env) -> - # env is an array of strings that say 'key=value' - _(env) - .invokeMap('split', '=') - .fromPairs() - .value() - .catch (err) -> - console.log('Error getting env from image', err, err.stack) - return {} diff --git a/src/lib/logger.coffee b/src/lib/logger.coffee deleted file mode 100644 index 873b6141..00000000 --- a/src/lib/logger.coffee +++ /dev/null @@ -1,106 +0,0 @@ -_ = require 'lodash' -Docker = require 'docker-toolbelt' -PUBNUB = require 'pubnub' -Promise = require 'bluebird' -es = require 'event-stream' -Lock = require 'rwlock' -{ docker } = require '../docker-utils' - -LOG_PUBLISH_INTERVAL = 110 - -# Pubnub's message size limit is 32KB (unclear on whether it's KB or actually KiB, -# but we'll be conservative). So we limit a log message to 2 bytes less to account -# for the [ and ] in the array. -MAX_LOG_BYTE_SIZE = 30000 -MAX_MESSAGE_INDEX = 9 - -disableLogs = false - -initialised = new Promise (resolve) -> - exports.init = (config) -> - resolve(config) - -# Queue up any calls to publish logs whilst we wait to be initialised. -publish = do -> - publishQueue = [[]] - messageIndex = 0 - publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE - logsOverflow = false - - initialised.then (config) -> - if config.offlineMode - publish = _.noop - publishQueue = null - return - pubnub = PUBNUB.init(config.pubnub) - channel = config.channel - doPublish = -> - return if publishQueue[0].length is 0 - message = publishQueue.shift() - pubnub.publish({ channel, message }) - if publishQueue.length is 0 - publishQueue = [[]] - publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE - messageIndex = Math.max(messageIndex - 1, 0) - logsOverflow = false if messageIndex < MAX_MESSAGE_INDEX - setInterval(doPublish, LOG_PUBLISH_INTERVAL) - - return (message) -> - # Disable sending logs for bandwidth control - return if disableLogs or (messageIndex >= MAX_MESSAGE_INDEX and publishQueueRemainingBytes <= 0) - if _.isString(message) - message = { m: message } - - _.defaults message, - t: Date.now() - m: '' - msgLength = Buffer.byteLength(encodeURIComponent(JSON.stringify(message)), 'utf8') - return if msgLength > MAX_LOG_BYTE_SIZE # Unlikely, but we can't allow this - remaining = publishQueueRemainingBytes - msgLength - if remaining >= 0 - publishQueue[messageIndex].push(message) - publishQueueRemainingBytes = remaining - else if messageIndex < MAX_MESSAGE_INDEX - messageIndex += 1 - publishQueue[messageIndex] = [ message ] - publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE - msgLength - else if !logsOverflow - logsOverflow = true - messageIndex += 1 - publishQueue[messageIndex] = [ { m: 'Warning! Some logs dropped due to high load', t: Date.now(), s: 1 } ] - publishQueueRemainingBytes = 0 - -# disable: A Boolean to pause the Log Publishing - Logs are lost when paused. -exports.disableLogPublishing = (disable) -> - disableLogs = disable - -exports.log = -> - publish(arguments...) - -do -> - _lock = new Lock() - _writeLock = Promise.promisify(_lock.async.writeLock) - loggerLock = (containerName) -> - _writeLock(containerName) - .disposer (release) -> - release() - - attached = {} - exports.attach = (app) -> - Promise.using loggerLock(app.containerName), -> - if !attached[app.containerName] - docker.getContainer(app.containerName) - .logs({ follow: true, stdout: true, stderr: true, timestamps: true }) - .then (stream) -> - attached[app.containerName] = true - stream.pipe(es.split()) - .on 'data', (logLine) -> - space = logLine.indexOf(' ') - if space > 0 - msg = { t: logLine.substr(0, space), m: logLine.substr(space + 1) } - publish(msg) - .on 'error', (err) -> - console.error('Error on container logs', err, err.stack) - attached[app.containerName] = false - .on 'end', -> - attached[app.containerName] = false diff --git a/src/lib/random-hex-string.coffee b/src/lib/random-hex-string.coffee deleted file mode 100644 index ae109732..00000000 --- a/src/lib/random-hex-string.coffee +++ /dev/null @@ -1,9 +0,0 @@ -Promise = require('bluebird') -crypto = require('crypto') - -exports.generate = (length = 32, callback) -> - Promise.try -> - crypto.randomBytes(length).toString('hex') - .catch -> - Promise.delay(1).then(exports.generate) - .nodeify(callback) diff --git a/src/request.coffee b/src/request.coffee deleted file mode 100644 index fe42d4eb..00000000 --- a/src/request.coffee +++ /dev/null @@ -1,52 +0,0 @@ -config = require './config' -PlatformAPI = require 'pinejs-client' -Promise = require 'bluebird' -request = require 'request' -resumable = require 'resumable-request' -url = require 'url' -osRelease = require './lib/os-release' - -osVersion = osRelease.getOSVersionSync(config.hostOSVersionPath) -osVariant = osRelease.getOSVariantSync(config.hostOSVersionPath) -supervisorVersion = require('./lib/supervisor-version') - -userAgent = "Supervisor/#{supervisorVersion}" -if osVersion? - if osVariant? - userAgent += " (Linux; #{osVersion}; #{osVariant})" - else - userAgent += " (Linux; #{osVersion})" - -# With these settings, the device must be unable to receive a single byte -# from the network for a continuous period of 20 minutes before we give up. -# (reqTimeout + retryInterval) * retryCount / 1000ms / 60sec ~> minutes -DEFAULT_REQUEST_TIMEOUT = 30000 # ms -DEFAULT_REQUEST_RETRY_INTERVAL = 10000 # ms -DEFAULT_REQUEST_RETRY_COUNT = 30 - -requestOpts = - gzip: true - timeout: DEFAULT_REQUEST_TIMEOUT - headers: - 'User-Agent': userAgent - -resumableOpts = - timeout: DEFAULT_REQUEST_TIMEOUT - maxRetries: DEFAULT_REQUEST_RETRY_COUNT - retryInterval: DEFAULT_REQUEST_RETRY_INTERVAL - -try - PLATFORM_ENDPOINT = url.resolve(config.apiEndpoint, '/v2/') - exports.resinApi = resinApi = new PlatformAPI - apiPrefix: PLATFORM_ENDPOINT - passthrough: requestOpts - exports.cachedResinApi = resinApi.clone({}, cache: {}) -catch - exports.resinApi = {} - exports.cachedResinApi = {} - -request = request.defaults(requestOpts) - -exports.request = Promise.promisifyAll(request, multiArgs: true) - -exports.resumable = resumable.defaults(resumableOpts) diff --git a/src/utils.coffee b/src/utils.coffee deleted file mode 100644 index d8ead9dd..00000000 --- a/src/utils.coffee +++ /dev/null @@ -1,349 +0,0 @@ -Promise = require 'bluebird' -_ = require 'lodash' -fs = Promise.promisifyAll require 'fs' -config = require './config' -knex = require './db' -mixpanel = require 'mixpanel' -networkCheck = require 'network-checker' -blink = require('blinking')(config.ledFile) -url = require 'url' -randomHexString = require './lib/random-hex-string' -{ request } = require './request' -logger = require './lib/logger' -TypedError = require 'typed-error' -execAsync = Promise.promisify(require('child_process').exec) -device = require './device' -{ checkTruthy } = require './lib/validation' -mask = require 'json-mask' - -exports.supervisorVersion = require('./lib/supervisor-version') - -configJson = JSON.parse(fs.readFileSync('/boot/config.json')) -if Boolean(config.apiEndpoint) and !Boolean(configJson.supervisorOfflineMode) - mixpanelClient = mixpanel.init(config.mixpanelToken, { host: config.mixpanelHost }) -else - mixpanelClient = { track: _.noop } - -exports.mixpanelProperties = mixpanelProperties = - username: configJson.username - -mixpanelMask = [ - 'appId' - 'delay' - 'error' - 'interval' - 'app(appId,imageId,commit,name)' - 'stateDiff(status,download_progress,commit,os_version,superisor_version,ip_address)' -].join(',') - -exports.mixpanelTrack = (event, properties = {}) -> - # Allow passing in an error directly and having it assigned to the error property. - if properties instanceof Error - properties = error: properties - - # If the properties has an error argument that is an Error object then it treats it nicely, - # rather than letting it become `{}` - if properties.error instanceof Error - properties.error = - message: properties.error.message - stack: properties.error.stack - - properties = _.cloneDeep(properties) - - # Filter properties to only send the whitelisted keys and values - properties = mask(properties, mixpanelMask) - console.log('Event:', event, JSON.stringify(properties)) - # Mutation is bad, and it should feel bad - properties = _.assign(properties, mixpanelProperties) - - mixpanelClient.track(event, properties) - -networkPattern = - blinks: 4 - pause: 1000 - -exports.blink = blink - -pauseConnectivityCheck = false -disableConnectivityCheck = false - -# options: An object of net.connect options, with the addition of: -# timeout: 10s -checkHost = (options) -> - if !isConnectivityCheckEnabled() - return true - else - return networkCheck.checkHost(options) - -exports.isConnectivityCheckEnabled = isConnectivityCheckEnabled = -> - return !disableConnectivityCheck and !pauseConnectivityCheck - -# Custom monitor that uses checkHost function above. -customMonitor = (options, fn) -> - networkCheck.monitor(checkHost, options, fn) - -# pause: A Boolean to pause the connectivity checks -exports.pauseCheck = (pause) -> - pauseConnectivityCheck = pause - -# disable: A Boolean to disable the connectivity checks -exports.disableCheck = disableCheck = (disable) -> - disableConnectivityCheck = disable - -# Call back for inotify triggered when the VPN status is changed. -vpnStatusInotifyCallback = -> - fs.lstatAsync(config.vpnStatusPath + '/active') - .then -> - pauseConnectivityCheck = true - .catch -> - pauseConnectivityCheck = false - -# Use the following to catch EEXIST errors -EEXIST = (err) -> err.code is 'EEXIST' - -do -> - _connected = true - exports.connected = -> - return _connected - - exports.connectivityCheck = _.once -> - if !config.apiEndpoint? - console.log('No apiEndpoint specified, skipping connectivity check') - return - parsedUrl = url.parse(config.apiEndpoint) - fs.mkdirAsync(config.vpnStatusPath) - .catch EEXIST, (err) -> - console.log('VPN status path exists.') - .then -> - fs.watch(config.vpnStatusPath, vpnStatusInotifyCallback) - - # Manually trigger the call back to detect cases when VPN was switched on before the supervisor starts. - vpnStatusInotifyCallback() - customMonitor - host: parsedUrl.hostname - port: parsedUrl.port ? (if parsedUrl.protocol is 'https:' then 443 else 80) - interval: 10 * 1000 - (connected) -> - _connected = connected - if connected - console.log('Internet Connectivity: OK') - blink.pattern.stop() - else - console.log('Waiting for connectivity...') - blink.pattern.start(networkPattern) - - -secretPromises = {} -generateSecret = (name) -> - Promise.try -> - return config.forceSecret[name] if config.forceSecret[name]? - return randomHexString.generate() - .then (newSecret) -> - secretInDB = { key: "#{name}Secret", value: newSecret } - knex('config').update(secretInDB).where(key: "#{name}Secret") - .then (affectedRows) -> - knex('config').insert(secretInDB) if affectedRows == 0 - .return(newSecret) - -exports.newSecret = (name) -> - secretPromises[name] ?= Promise.resolve() - secretPromises[name] = secretPromises[name].then -> - generateSecret(name) - -exports.getOrGenerateSecret = (name) -> - secretPromises[name] ?= knex('config').select('value').where(key: "#{name}Secret").then ([ secret ]) -> - return secret.value if secret? - generateSecret(name) - return secretPromises[name] - -exports.getConfig = getConfig = (key) -> - knex('config').select('value').where({ key }) - .then ([ conf ]) -> - return conf?.value - -exports.setConfig = (key, value = null) -> - knex('config').update({ value }).where({ key }) - .then (n) -> - knex('config').insert({ key, value }) if n == 0 - -exports.extendEnvVars = (env, uuid, apiKey, appId, appName, commit) -> - host = '127.0.0.1' - newEnv = - RESIN_APP_ID: appId.toString() - RESIN_APP_NAME: appName - RESIN_APP_RELEASE: commit - RESIN_DEVICE_UUID: uuid - RESIN_DEVICE_NAME_AT_INIT: getConfig('name') - RESIN_DEVICE_TYPE: device.getDeviceType() - RESIN_HOST_OS_VERSION: device.getOSVersion() - RESIN_SUPERVISOR_ADDRESS: "http://#{host}:#{config.listenPort}" - RESIN_SUPERVISOR_HOST: host - RESIN_SUPERVISOR_PORT: config.listenPort - RESIN_SUPERVISOR_API_KEY: exports.getOrGenerateSecret('api') - RESIN_SUPERVISOR_VERSION: exports.supervisorVersion - RESIN_API_KEY: apiKey - RESIN: '1' - USER: 'root' - if env? - _.defaults(newEnv, env) - return Promise.props(newEnv) - -# Callback function to enable/disable tcp pings -exports.enableConnectivityCheck = (val) -> - enabled = checkTruthy(val) ? true - disableCheck(!enabled) - console.log("Connectivity check enabled: #{enabled}") - return true - -# Callback function to enable/disable logs -exports.resinLogControl = (val) -> - logEnabled = checkTruthy(val) ? true - logger.disableLogPublishing(!logEnabled) - console.log('Logs enabled: ' + val) - return true - -emptyHostRequest = request.defaults({ headers: Host: '' }) -gosuperRequest = (method, endpoint, options = {}, callback) -> - if _.isFunction(options) - callback = options - options = {} - options.method = method - options.url = config.gosuperAddress + endpoint - emptyHostRequest(options, callback) - -gosuperPost = _.partial(gosuperRequest, 'POST') -gosuperGet = _.partial(gosuperRequest, 'GET') - -exports.gosuper = gosuper = - post: gosuperPost - get: gosuperGet - postAsync: Promise.promisify(gosuperPost, multiArgs: true) - getAsync: Promise.promisify(gosuperGet, multiArgs: true) - -# Callback function to enable/disable VPN -exports.vpnControl = (val, logMessage, { initial = false } = {}) -> - enable = checkTruthy(val) ? true - # If it's the initial run, we always want the VPN enabled, so we ignore calls to disable it - if initial and !enable - return Promise.resolve(false) - gosuper.postAsync('/v1/vpncontrol', { json: true, body: Enable: enable }) - .spread (response, body) -> - if response.statusCode == 202 - console.log('VPN enabled: ' + enable) - return true - else - logMessage("Error (#{response.statusCode}) toggling VPN: #{body}", {}, 'Toggle VPN error') - return false - .catchReturn(false) - -exports.restartSystemdService = (serviceName) -> - gosuper.postAsync('/v1/restart-service', { json: true, body: Name: serviceName }) - .spread (response, body) -> - if response.statusCode != 200 - err = new Error("Error restarting service #{serviceName}: #{response.statusCode} #{body}") - err.statusCode = response.statusCode - throw err - -exports.AppNotFoundError = class AppNotFoundError extends TypedError - -exports.getKnexApp = (appId, columns) -> - knex('app').select(columns).where({ appId }) - .then ([ app ]) -> - if !app? - throw new AppNotFoundError('App not found') - return app - -exports.getKnexApps = (columns) -> - knex('app').select(columns) - -exports.defaultVolumes = (includeV1Volumes) -> - volumes = { - '/data': {} - '/lib/modules': {} - '/lib/firmware': {} - '/host/run/dbus': {} - } - if includeV1Volumes - volumes['/host/var/lib/connman'] = {} - volumes['/host_run/dbus'] = {} - return volumes - -exports.getDataPath = (identifier) -> - return config.dataPath + '/' + identifier - -exports.defaultBinds = (dataPath, includeV1Binds) -> - binds = [ - exports.getDataPath(dataPath) + ':/data' - "/tmp/resin-supervisor/#{dataPath}:/tmp/resin" - '/lib/modules:/lib/modules' - '/lib/firmware:/lib/firmware' - '/run/dbus:/host/run/dbus' - ] - if includeV1Binds - binds.push('/run/dbus:/host_run/dbus') - binds.push('/var/lib/connman:/host/var/lib/connman') - return binds - -exports.validComposeOptions = [ - 'command' - 'entrypoint' - 'environment' - 'expose' - 'image' - 'labels' - 'links' - 'net' - 'network_mode' - 'ports' - 'privileged' - 'restart' - 'stop_signal' - 'user' - 'volumes' # Will be overwritten with the default binds - 'working_dir' -] - -exports.validContainerOptions = [ - 'Hostname' - 'User' - 'Env' - 'Labels' - 'Cmd' - 'Entrypoint' - 'Image' - 'Volumes' - 'WorkingDir' - 'ExposedPorts' - 'HostConfig' - 'Name' -] - -exports.validHostConfigOptions = [ - 'Binds' # Will be overwritten with the default binds - 'Links' - 'PortBindings' - 'Privileged' - 'RestartPolicy' - 'NetworkMode' -] - -exports.validateKeys = (options, validSet) -> - Promise.try -> - return if !options? - invalidKeys = _.keys(_.omit(options, validSet)) - throw new Error("Using #{invalidKeys.join(', ')} is not allowed.") if !_.isEmpty(invalidKeys) - -checkAndAddIptablesRule = (rule) -> - execAsync("iptables -C #{rule}") - .catch -> - execAsync("iptables -A #{rule}") - -exports.createIpTablesRules = -> - allowedInterfaces = ['resin-vpn', 'tun0', 'docker0', 'lo'] - Promise.each allowedInterfaces, (iface) -> - checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -i #{iface} -j ACCEPT") - .then -> - checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -j REJECT") - .catch -> - # On systems without REJECT support, fall back to DROP - checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -j DROP")