app.coffee: Switch to the multicontainer supervisor, add missing dependencies, and remove all files that are not used anymore

Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
Pablo Carranza Velez 2017-11-01 01:17:35 -07:00
parent 14d2bc3f39
commit 5f651c71f7
12 changed files with 4 additions and 2641 deletions

View File

@ -41,6 +41,7 @@
"memoizee": "^0.4.1",
"mixpanel": "0.0.20",
"mkdirp": "^0.5.1",
"ncp": "^2.0.0",
"network-checker": "~0.0.5",
"node-loader": "^0.6.0",
"null-loader": "^0.1.1",

View File

@ -1,257 +0,0 @@
Promise = require 'bluebird'
utils = require './utils'
express = require 'express'
bodyParser = require 'body-parser'
bufferEq = require 'buffer-equal-constant-time'
config = require './config'
device = require './device'
_ = require 'lodash'
proxyvisor = require './proxyvisor'
hostConfig = require './host-config'
module.exports = (application) ->
authenticate = (req, res, next) ->
queryKey = req.query.apikey
header = req.get('Authorization') ? ''
match = header.match(/^ApiKey (\w+)$/)
headerKey = match?[1]
utils.getOrGenerateSecret('api')
.then (secret) ->
if queryKey? && bufferEq(new Buffer(queryKey), new Buffer(secret))
next()
else if headerKey? && bufferEq(new Buffer(headerKey), new Buffer(secret))
next()
else if application.localMode
next()
else
res.sendStatus(401)
.catch (err) ->
# This should never happen...
res.status(503).send('Invalid API key in supervisor')
api = express()
unparsedRouter = express.Router()
parsedRouter = express.Router()
unauthenticatedRouter = express.Router()
parsedRouter.use(bodyParser())
parsedRouter.use(authenticate)
unparsedRouter.use(authenticate)
unparsedRouter.get '/ping', (req, res) ->
res.send('OK')
unparsedRouter.post '/v1/blink', (req, res) ->
utils.mixpanelTrack('Device blink')
utils.blink.pattern.start()
setTimeout(utils.blink.pattern.stop, 15000)
res.sendStatus(200)
parsedRouter.post '/v1/update', (req, res) ->
utils.mixpanelTrack('Update notification')
application.update(req.body.force)
res.sendStatus(204)
parsedRouter.post '/v1/reboot', (req, res) ->
force = req.body.force
Promise.map utils.getKnexApps(), (theApp) ->
Promise.using application.lockUpdates(theApp.appId, force), ->
# There's a slight chance the app changed after the previous select
# So we fetch it again now the lock is acquired
utils.getKnexApp(theApp.appId)
.then (app) ->
application.kill(app, removeContainer: false) if app?
.then ->
application.logSystemMessage('Rebooting', {}, 'Reboot')
device.reboot()
.then (response) ->
res.status(202).json(response)
.catch (err) ->
if err instanceof application.UpdatesLockedError
status = 423
else
status = 500
res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' })
parsedRouter.post '/v1/shutdown', (req, res) ->
force = req.body.force
Promise.map utils.getKnexApps(), (theApp) ->
Promise.using application.lockUpdates(theApp.appId, force), ->
# There's a slight chance the app changed after the previous select
# So we fetch it again now the lock is acquired
utils.getKnexApp(theApp.appId)
.then (app) ->
application.kill(app, removeContainer: false) if app?
.then ->
application.logSystemMessage('Shutting down', {}, 'Shutdown')
device.shutdown()
.then (response) ->
res.status(202).json(response)
.catch (err) ->
if err instanceof application.UpdatesLockedError
status = 423
else
status = 500
res.status(status).json({ Data: '', Error: err?.message or err or 'Unknown error' })
parsedRouter.post '/v1/purge', (req, res) ->
appId = req.body.appId
application.logSystemMessage('Purging /data', { appId }, 'Purge /data')
if !appId?
return res.status(400).send('Missing app id')
Promise.using application.lockUpdates(appId, true), ->
utils.getKnexApp(appId)
.then (app) ->
application.kill(app)
.then ->
new Promise (resolve, reject) ->
utils.gosuper.post('/v1/purge', { json: true, body: applicationId: appId })
.on('error', reject)
.on('response', -> resolve())
.pipe(res)
.then ->
application.logSystemMessage('Purged /data', { appId }, 'Purge /data success')
.finally ->
application.start(app)
.catch (err) ->
status = 503
if err instanceof utils.AppNotFoundError
errMsg = "App not found: an app needs to be installed for purge to work.
If you've recently moved this device from another app,
please push an app and wait for it to be installed first."
err = new Error(errMsg)
status = 400
application.logSystemMessage("Error purging /data: #{err}", { appId, error: err }, 'Purge /data error')
res.status(status).send(err?.message or err or 'Unknown error')
unparsedRouter.post '/v1/tcp-ping', (req, res) ->
utils.disableCheck(false)
res.sendStatus(204)
unparsedRouter.delete '/v1/tcp-ping', (req, res) ->
utils.disableCheck(true)
res.sendStatus(204)
parsedRouter.post '/v1/restart', (req, res) ->
appId = req.body.appId
force = req.body.force
utils.mixpanelTrack('Restart container', appId)
if !appId?
return res.status(400).send('Missing app id')
Promise.using application.lockUpdates(appId, force), ->
utils.getKnexApp(appId)
.then (app) ->
application.kill(app)
.then ->
application.start(app)
.then ->
res.status(200).send('OK')
.catch utils.AppNotFoundError, (e) ->
return res.status(400).send(e.message)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
parsedRouter.post '/v1/apps/:appId/stop', (req, res) ->
{ appId } = req.params
{ force } = req.body
utils.mixpanelTrack('Stop container', appId)
if !appId?
return res.status(400).send('Missing app id')
Promise.using application.lockUpdates(appId, force), ->
utils.getKnexApp(appId)
.then (app) ->
application.getContainerId(app)
.then (containerId) ->
application.kill(app, removeContainer: false)
.then (app) ->
res.json({ containerId })
.catch utils.AppNotFoundError, (e) ->
return res.status(400).send(e.message)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
unparsedRouter.post '/v1/apps/:appId/start', (req, res) ->
{ appId } = req.params
utils.mixpanelTrack('Start container', appId)
if !appId?
return res.status(400).send('Missing app id')
Promise.using application.lockUpdates(appId), ->
utils.getKnexApp(appId)
.tap (app) ->
application.start(app)
.then ->
application.getContainerId(app)
.then (containerId) ->
res.json({ containerId })
.catch utils.AppNotFoundError, (e) ->
return res.status(400).send(e.message)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
unparsedRouter.get '/v1/apps/:appId', (req, res) ->
{ appId } = req.params
utils.mixpanelTrack('GET app', appId)
if !appId?
return res.status(400).send('Missing app id')
Promise.using application.lockUpdates(appId, true), ->
columns = [ 'appId', 'containerName', 'commit', 'imageId', 'env' ]
utils.getKnexApp(appId, columns)
.then (app) ->
application.getContainerId(app)
.then (containerId) ->
# Don't return keys on the endpoint
app.env = _.omit(JSON.parse(app.env), config.privateAppEnvVars)
app.containerId = containerId
# Don't return data that will be of no use to the user
res.json(_.omit(app, [ 'containerName' ]))
.catch utils.AppNotFoundError, (e) ->
return res.status(400).send(e.message)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
# Expires the supervisor's API key and generates a new one.
# It also communicates the new key to the Resin API.
unparsedRouter.post '/v1/regenerate-api-key', (req, res) ->
utils.newSecret('api')
.then (secret) ->
device.updateState(api_secret: secret)
res.status(200).send(secret)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
parsedRouter.get '/v1/device/host-config', (req, res) ->
hostConfig.get()
.then (conf) ->
res.json(conf)
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
parsedRouter.patch '/v1/device/host-config', (req, res) ->
hostConfig.patch(req.body)
.then ->
res.status(200).send('OK')
.catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error')
unparsedRouter.get '/v1/device', (req, res) ->
res.json(device.getState())
unauthenticatedRouter.get '/v1/healthy', (req, res) ->
# Has the update cycle not hung? (unless we're downloading an image)
healthy = application.healthy()
# If we're connected and we know it, has the current state been reported?
healthy and= device.stateReportHealthy()
# As far as we know, is gosuper healthy?
healthy and= device.gosuperHealthy()
if healthy
res.sendStatus(200)
else
res.sendStatus(500)
api.use(unauthenticatedRouter)
api.use(unparsedRouter)
api.use(parsedRouter)
api.use(proxyvisor.router)
return api

View File

@ -1,71 +1,6 @@
require('log-timestamp')
process.on 'uncaughtException', (e) ->
console.error('Got unhandled exception', e, e?.stack)
Promise = require 'bluebird'
knex = require './db'
utils = require './utils'
bootstrap = require './bootstrap'
config = require './config'
Supervisor = require './supervisor'
knex.init.then ->
utils.mixpanelTrack('Supervisor start')
console.log('Starting connectivity check..')
utils.connectivityCheck()
Promise.join bootstrap.startBootstrapping(), utils.getOrGenerateSecret('api'), utils.getOrGenerateSecret('logsChannel'), (uuid, secret, logsChannel) ->
# Persist the uuid in subsequent metrics
utils.mixpanelProperties.uuid = uuid
utils.mixpanelProperties.distinct_id = uuid
api = require './api'
application = require('./application')(logsChannel, bootstrap.offlineMode)
device = require './device'
console.log('Starting API server..')
utils.createIpTablesRules()
.then ->
apiServer = api(application).listen(config.listenPort)
apiServer.timeout = config.apiTimeout
device.events.on 'shutdown', ->
apiServer.close()
bootstrap.done
.then ->
Promise.join(
device.getOSVersion()
device.getOSVariant()
(osVersion, osVariant) ->
# Let API know what version we are, and our api connection info.
console.log('Updating supervisor version and api info')
device.updateState(
api_port: config.listenPort
api_secret: secret
os_version: osVersion
os_variant: osVariant
supervisor_version: utils.supervisorVersion
provisioning_progress: null
provisioning_state: ''
download_progress: null
logs_channel: logsChannel
)
)
updateIpAddr = ->
utils.gosuper.getAsync('/v1/ipaddr', { json: true })
.spread (response, body) ->
if response.statusCode != 200 || !body.Data.IPAddresses?
throw new Error('Invalid response from gosuper')
device.reportHealthyGosuper()
device.updateState(
ip_address: body.Data.IPAddresses.join(' ')
)
.catch ->
device.reportUnhealthyGosuper()
console.log('Starting periodic check for IP addresses..')
setInterval(updateIpAddr, 30 * 1000) # Every 30s
updateIpAddr()
console.log('Starting Apps..')
application.initialize()
supervisor = new Supervisor()
supervisor.init()

View File

@ -1,985 +0,0 @@
_ = require 'lodash'
url = require 'url'
Lock = require 'rwlock'
knex = require './db'
config = require './config'
dockerUtils = require './docker-utils'
Promise = require 'bluebird'
utils = require './utils'
logger = require './lib/logger'
{ cachedResinApi } = require './request'
device = require './device'
lockFile = Promise.promisifyAll(require('lockfile'))
bootstrap = require './bootstrap'
TypedError = require 'typed-error'
fs = Promise.promisifyAll(require('fs'))
JSONStream = require 'JSONStream'
proxyvisor = require './proxyvisor'
{ checkInt, checkTruthy } = require './lib/validation'
osRelease = require './lib/os-release'
deviceConfig = require './device-config'
randomHexString = require './lib/random-hex-string'
UPDATE_IDLE = 0
UPDATE_UPDATING = 1
UPDATE_REQUIRED = 2
UPDATE_SCHEDULED = 3
updateStatus =
state: UPDATE_IDLE
failed: 0
forceNext: false
intervalHandle: null
lastFullUpdateCycle: process.hrtime()
currentlyDownloading: false
timeSpentPulling: 0
class UpdatesLockedError extends TypedError
ImageNotFoundError = (err) ->
return "#{err.statusCode}" is '404'
{ docker } = dockerUtils
logTypes =
stopApp:
eventName: 'Application kill'
humanName: 'Killing application'
stopAppSuccess:
eventName: 'Application stop'
humanName: 'Killed application'
stopAppNoop:
eventName: 'Application already stopped'
humanName: 'Application is already stopped, removing container'
stopRemoveAppNoop:
eventName: 'Application already stopped and container removed'
humanName: 'Application is already stopped and the container removed'
stopAppError:
eventName: 'Application stop error'
humanName: 'Failed to kill application'
downloadApp:
eventName: 'Application docker download'
humanName: 'Downloading application'
downloadAppDelta:
eventName: 'Application delta download'
humanName: 'Downloading delta for application'
downloadAppSuccess:
eventName: 'Application downloaded'
humanName: 'Downloaded application'
downloadAppError:
eventName: 'Application download error'
humanName: 'Failed to download application'
installApp:
eventName: 'Application install'
humanName: 'Installing application'
installAppSuccess:
eventName: 'Application installed'
humanName: 'Installed application'
installAppError:
eventName: 'Application install error'
humanName: 'Failed to install application'
deleteImageForApp:
eventName: 'Application image removal'
humanName: 'Deleting image for application'
deleteImageForAppSuccess:
eventName: 'Application image removed'
humanName: 'Deleted image for application'
deleteImageForAppError:
eventName: 'Application image removal error'
humanName: 'Failed to delete image for application'
imageAlreadyDeleted:
eventName: 'Image already deleted'
humanName: 'Image already deleted for application'
startApp:
eventName: 'Application start'
humanName: 'Starting application'
startAppSuccess:
eventName: 'Application started'
humanName: 'Started application'
startAppNoop:
eventName: 'Application already running'
humanName: 'Application is already running'
startAppError:
eventName: 'Application start error'
humanName: 'Failed to start application'
updateApp:
eventName: 'Application update'
humanName: 'Updating application'
updateAppError:
eventName: 'Application update error'
humanName: 'Failed to update application'
appExit:
eventName: 'Application exit'
humanName: 'Application exited'
appRestart:
eventName: 'Application restart'
humanName: 'Restarting application'
updateAppConfig:
eventName: 'Application config update'
humanName: 'Updating config for application'
updateAppConfigSuccess:
eventName: 'Application config updated'
humanName: 'Updated config for application'
updateAppConfigError:
eventName: 'Application config update error'
humanName: 'Failed to update config for application'
application = {}
application.UpdatesLockedError = UpdatesLockedError
application.localMode = false
application.healthy = ->
timeSinceLastCycle = process.hrtime(updateStatus.lastFullUpdateCycle)[0] * 1000
return bootstrap.offlineMode or updateStatus.currentlyDownloading or timeSinceLastCycle - updateStatus.timeSpentPulling <= 2 * config.appUpdatePollInterval
application.logSystemMessage = logSystemMessage = (message, obj, eventName) ->
logger.log({ m: message, s: 1 })
utils.mixpanelTrack(eventName ? message, obj)
logSystemEvent = (logType, app = {}, error) ->
message = "#{logType.humanName} '#{app.imageId}'"
if error?
# Report the message from the original cause to the user.
errMessage = error.message
if _.isEmpty(errMessage)
errMessage = error.json
if _.isEmpty(errMessage)
errMessage = error.reason
if _.isEmpty(errMessage)
errMessage = 'Unknown cause'
message += " due to '#{errMessage}'"
logSystemMessage(message, { app, error }, logType.eventName)
return
logSpecialAction = (action, value, success) ->
if success?
if success
if !value?
msg = "Cleared config variable #{action}"
else
msg = "Applied config variable #{action} = #{value}"
else
if !value?
msg = "Config variable #{action} not cleared yet"
else
msg = "Config variable #{action} = #{value} not applied yet"
else
if !value?
msg = "Clearing config variable #{action}"
else
msg = "Applying config variable #{action} = #{value}"
logSystemMessage(msg, {}, "Apply special action #{if success then "success" else "in progress"}")
application.kill = kill = (app, { updateDB = true, removeContainer = true } = {}) ->
logSystemEvent(logTypes.stopApp, app)
device.updateState(status: 'Stopping')
container = docker.getContainer(app.containerName)
container.stop(t: 10)
.then ->
container.remove(v: true) if removeContainer
return
.catch (err) ->
# Get the statusCode from the original cause and make sure statusCode its definitely a string for comparison
# reasons.
statusCode = '' + err.statusCode
# 304 means the container was already stopped - so we can just remove it
if statusCode is '304'
logSystemEvent(logTypes.stopAppNoop, app)
container.remove(v: true) if removeContainer
return
# 404 means the container doesn't exist, precisely what we want! :D
if statusCode is '404'
logSystemEvent(logTypes.stopRemoveAppNoop, app)
return
throw err
.tap ->
lockFile.unlockAsync(tmpLockPath(app))
.tap ->
device.isResinOSv1()
.then (isV1) ->
lockFile.unlockAsync(persistentLockPath(app)) if isV1
.tap ->
logSystemEvent(logTypes.stopAppSuccess, app)
if removeContainer && updateDB
app.containerName = null
knex('app').update(app).where(appId: app.appId)
.catch (err) ->
logSystemEvent(logTypes.stopAppError, app, err)
throw err
application.deleteImage = deleteImage = (app) ->
logSystemEvent(logTypes.deleteImageForApp, app)
docker.getImage(app.imageId).remove(force: true)
.then ->
logSystemEvent(logTypes.deleteImageForAppSuccess, app)
.catch ImageNotFoundError, (err) ->
logSystemEvent(logTypes.imageAlreadyDeleted, app)
.catch (err) ->
logSystemEvent(logTypes.deleteImageForAppError, app, err)
throw err
isValidPort = (port) ->
maybePort = parseInt(port, 10)
return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535
fetch = (app, { deltaSource, setDeviceUpdateState = true } = {}) ->
onProgress = (progress) ->
device.updateState(download_progress: progress.percentage)
startTime = process.hrtime()
docker.getImage(app.imageId).inspect()
.catch ImageNotFoundError, ->
updateStatus.currentlyDownloading = true
device.updateState(status: 'Downloading', download_progress: 0)
Promise.try ->
conf = JSON.parse(app.config)
Promise.join utils.getConfig('apiKey'), utils.getConfig('uuid'), (apiKey, uuid) ->
if checkTruthy(conf['RESIN_SUPERVISOR_DELTA'])
logSystemEvent(logTypes.downloadAppDelta, app)
deltaOpts = {
uuid, apiKey, deltaSource
# use user-defined timeouts, but fallback to defaults if none is provided.
requestTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_REQUEST_TIMEOUT'], positive: true)
applyTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_APPLY_TIMEOUT'], positive: true)
retryCount: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_COUNT'], positive: true)
retryInterval: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL'], positive: true)
}
dockerUtils.rsyncImageWithProgress(app.imageId, deltaOpts, onProgress)
else
logSystemEvent(logTypes.downloadApp, app)
dockerUtils.fetchImageWithProgress(app.imageId, onProgress, { uuid, apiKey })
.then ->
logSystemEvent(logTypes.downloadAppSuccess, app)
device.updateState(status: 'Idle', download_progress: null)
device.setUpdateState(update_downloaded: true) if setDeviceUpdateState
docker.getImage(app.imageId).inspect()
.catch (err) ->
logSystemEvent(logTypes.downloadAppError, app, err)
throw err
.finally ->
updateStatus.timeSpentPulling += process.hrtime(startTime)[0]
updateStatus.currentlyDownloading = false
shouldMountKmod = (image) ->
device.isResinOSv1().then (isV1) ->
return false if not isV1
Promise.using docker.imageRootDirMounted(image), (rootDir) ->
osRelease.getOSVersion(rootDir + '/etc/os-release')
.then (version) ->
return version? and /^(Debian|Raspbian)/i.test(version)
.catch (err) ->
console.error('Error getting app OS release: ', err)
return false
isExecFormatError = (err) ->
message = ''
try
message = err.json.trim()
/exec format error$/.test(message)
generateContainerName = (app) ->
randomHexString.generate()
.then (randomString) ->
sanitisedAppName = (app.name ? 'app').replace(/[^a-zA-Z0-9]+/g, '_')
return "#{sanitisedAppName}_#{randomString}"
application.start = start = (app) ->
device.isResinOSv1().then (isV1) ->
volumes = utils.defaultVolumes(isV1)
binds = utils.defaultBinds(app.appId, isV1)
alreadyStarted = false
Promise.try ->
# Parse the env vars before trying to access them, that's because they have to be stringified for knex..
return [ JSON.parse(app.env), JSON.parse(app.config) ]
.spread (env, conf) ->
if env.PORT?
portList = env.PORT
.split(',')
.map((port) -> port.trim())
.filter(isValidPort)
if app.containerName?
# If we have a container id then check it exists and if so use it.
container = docker.getContainer(app.containerName)
containerPromise = container.inspect().return(container)
else
containerPromise = Promise.rejected()
# If there is no existing container then create one instead.
containerPromise.catch ->
# If the image is not available, we'll let the update cycle fix it
docker.getImage(app.imageId).inspect()
.then (imageInfo) ->
logSystemEvent(logTypes.installApp, app)
device.updateState(status: 'Installing')
generateContainerName(app)
.tap (name) ->
app.containerName = name
knex('app').update(app).where(appId: app.appId)
.then (affectedRows) ->
knex('app').insert(app) if affectedRows == 0
.then (name) ->
ports = {}
portBindings = {}
if portList?
portList.forEach (port) ->
ports[port + '/tcp'] = {}
portBindings[port + '/tcp'] = [ HostPort: port ]
if imageInfo?.Config?.Cmd
cmd = imageInfo.Config.Cmd
else
cmd = [ '/bin/bash', '-c', '/start' ]
restartPolicy = createRestartPolicy({ name: conf['RESIN_APP_RESTART_POLICY'], maximumRetryCount: conf['RESIN_APP_RESTART_RETRIES'] })
shouldMountKmod(app.imageId)
.then (shouldMount) ->
binds.push('/bin/kmod:/bin/kmod:ro') if shouldMount
docker.createContainer(
name: name
Image: app.imageId
Cmd: cmd
Tty: true
Volumes: volumes
Env: _.map env, (v, k) -> k + '=' + v
ExposedPorts: ports
HostConfig:
Privileged: true
NetworkMode: 'host'
PortBindings: portBindings
Binds: binds
RestartPolicy: restartPolicy
)
.tap ->
logSystemEvent(logTypes.installAppSuccess, app)
.catch (err) ->
logSystemEvent(logTypes.installAppError, app, err)
throw err
.tap (container) ->
logSystemEvent(logTypes.startApp, app)
device.updateState(status: 'Starting')
container.start()
.catch (err) ->
statusCode = '' + err.statusCode
# 304 means the container was already started, precisely what we want :)
if statusCode is '304'
alreadyStarted = true
return
if statusCode is '500' and isExecFormatError(err)
# Provide a friendlier error message for "exec format error"
device.getDeviceType()
.then (deviceType) ->
throw new Error("Application architecture incompatible with #{deviceType}: exec format error")
else
# rethrow the same error
throw err
.catch (err) ->
# If starting the container failed, we remove it so that it doesn't litter
container.remove(v: true)
.finally ->
throw err
.then ->
device.updateState(commit: app.commit)
logger.attach(app)
.catch (err) ->
# If creating or starting the app failed, we soft-delete it so that
# the next update cycle doesn't think the app is up to date
app.containerName = null
app.markedForDeletion = true
knex('app').update(app).where(appId: app.appId)
.finally ->
logSystemEvent(logTypes.startAppError, app, err)
throw err
.tap ->
if alreadyStarted
logSystemEvent(logTypes.startAppNoop, app)
else
logSystemEvent(logTypes.startAppSuccess, app)
.finally ->
device.updateState(status: 'Idle')
validRestartPolicies = [ 'no', 'always', 'on-failure', 'unless-stopped' ]
# Construct a restart policy based on its name and maximumRetryCount.
# Both arguments are optional, and the default policy is "always".
#
# Throws exception if an invalid policy name is given.
# Returns a RestartPolicy { Name, MaximumRetryCount } object
createRestartPolicy = ({ name, maximumRetryCount }) ->
if not name?
name = 'always'
if not (name in validRestartPolicies)
throw new Error("Invalid restart policy: #{name}")
policy = { Name: name }
if name is 'on-failure' and maximumRetryCount?
policy.MaximumRetryCount = maximumRetryCount
return policy
persistentLockPath = (app) ->
appId = app.appId ? app
return "/mnt/root#{config.dataPath}/#{appId}/resin-updates.lock"
tmpLockPath = (app) ->
appId = app.appId ? app
return "/mnt/root/tmp/resin-supervisor/#{appId}/resin-updates.lock"
killmePath = (app) ->
appId = app.appId ? app
return "/mnt/root#{config.dataPath}/#{appId}/resin-kill-me"
# At boot, all apps should be unlocked *before* start to prevent a deadlock
application.unlockAndStart = unlockAndStart = (app) ->
lockFile.unlockAsync(persistentLockPath(app))
.then ->
start(app)
ENOENT = (err) -> err.code is 'ENOENT'
application.lockUpdates = lockUpdates = do ->
_lock = new Lock()
_writeLock = Promise.promisify(_lock.async.writeLock)
return (app, force) ->
device.isResinOSv1()
.then (isV1) ->
persistentLockName = persistentLockPath(app)
tmpLockName = tmpLockPath(app)
_writeLock(tmpLockName)
.tap (release) ->
if isV1 and force != true
lockFile.lockAsync(persistentLockName)
.catch ENOENT, _.noop
.catch (err) ->
release()
throw new UpdatesLockedError("Updates are locked: #{err.message}")
.tap (release) ->
if force != true
lockFile.lockAsync(tmpLockName)
.catch ENOENT, _.noop
.catch (err) ->
Promise.try ->
lockFile.unlockAsync(persistentLockName) if isV1
.finally ->
release()
throw new UpdatesLockedError("Updates are locked: #{err.message}")
.disposer (release) ->
Promise.try ->
lockFile.unlockAsync(tmpLockName) if force != true
.then ->
lockFile.unlockAsync(persistentLockName) if isV1 and force != true
.finally ->
release()
joinErrorMessages = (failures) ->
s = if failures.length > 1 then 's' else ''
messages = _.map failures, (err) ->
err.message or err
"#{failures.length} error#{s}: #{messages.join(' - ')}"
# Function to start the application update polling
application.poll = ->
updateStatus.intervalHandle = setInterval(->
application.update()
, config.appUpdatePollInterval)
# Callback function to set the API poll interval dynamically.
apiPollInterval = (val) ->
config.appUpdatePollInterval = checkInt(val, positive: true) ? 60000
console.log('New API poll interval: ' + val)
clearInterval(updateStatus.intervalHandle)
application.poll()
return true
setLocalMode = (val) ->
mode = checkTruthy(val) ? false
device.getOSVariant()
.then (variant) ->
if variant is not 'dev'
logSystemMessage('Not a development OS, ignoring local mode', {}, 'Ignore local mode')
return
Promise.try ->
if mode and !application.localMode
logSystemMessage('Entering local mode, app will be forcefully stopped', {}, 'Enter local mode')
Promise.map utils.getKnexApps(), (theApp) ->
Promise.using application.lockUpdates(theApp.appId, true), ->
# There's a slight chance the app changed after the previous select
# So we fetch it again now the lock is acquired
utils.getKnexApp(theApp.appId)
.then (app) ->
application.kill(app) if app?
else if !mode and application.localMode
logSystemMessage('Exiting local mode, app will be resumed', {}, 'Exit local mode')
Promise.map utils.getKnexApps(), (app) ->
unlockAndStart(app)
.then ->
application.localMode = mode
.return(true)
specialActionConfigVars = [
[ 'RESIN_SUPERVISOR_LOCAL_MODE', setLocalMode ]
[ 'RESIN_SUPERVISOR_VPN_CONTROL', utils.vpnControl ]
[ 'RESIN_SUPERVISOR_CONNECTIVITY_CHECK', utils.enableConnectivityCheck ]
[ 'RESIN_SUPERVISOR_POLL_INTERVAL', apiPollInterval ]
[ 'RESIN_SUPERVISOR_LOG_CONTROL', utils.resinLogControl ]
]
specialActionConfigKeys = _.map(specialActionConfigVars, 0)
executedSpecialActionConfigVars = {}
executeSpecialActionsAndHostConfig = (conf, oldConf, opts) ->
updatedValues = _.pickBy _.clone(oldConf), (val, key) ->
_.startsWith(key, device.hostConfigConfigVarPrefix) or _.includes(specialActionConfigKeys, key)
needsReboot = false
Promise.mapSeries specialActionConfigVars, ([ key, specialActionCallback ]) ->
# This makes the Special Action Envs only trigger their functions once.
if specialActionCallback? and executedSpecialActionConfigVars[key] != conf[key]
logSpecialAction(key, conf[key])
Promise.try ->
specialActionCallback(conf[key], logSystemMessage, opts)
.then (updated) ->
if updated
updatedValues[key] = conf[key]
executedSpecialActionConfigVars[key] = conf[key]
logSpecialAction(key, conf[key], updated)
.then ->
hostConfigVars = _.pickBy conf, (val, key) ->
return _.startsWith(key, device.hostConfigConfigVarPrefix)
oldHostConfigVars = _.pickBy oldConf, (val, key) ->
return _.startsWith(key, device.hostConfigConfigVarPrefix)
if !_.isEqual(hostConfigVars, oldHostConfigVars)
device.setHostConfig(hostConfigVars, oldHostConfigVars, logSystemMessage)
.then (changedHostConfig) ->
needsReboot = changedHostConfig
if changedHostConfig
for own key of oldHostConfigVars
delete updatedValues[key]
_.assign(updatedValues, hostConfigVars)
.then ->
return { updatedValues, needsReboot }
getAndApplyDeviceConfig = ({ initial = false } = {}) ->
deviceConfig.get()
.then ({ values, targetValues }) ->
executeSpecialActionsAndHostConfig(targetValues, values, { initial })
.tap ({ updatedValues }) ->
deviceConfig.set({ values: updatedValues }) if !_.isEqual(values, updatedValues)
.then ({ needsReboot }) ->
if needsReboot
logSystemMessage('Rebooting', {}, 'Reboot')
Promise.delay(1000)
.then ->
device.reboot()
wrapAsError = (err) ->
return err if _.isError(err)
return new Error(err.message ? err)
# Wait for app to signal it's ready to die, or timeout to complete.
# timeout defaults to 1 minute.
waitToKill = (app, timeout) ->
startTime = Date.now()
pollInterval = 100
timeout = checkInt(timeout, positive: true) ? 60000
checkFileOrTimeout = ->
fs.statAsync(killmePath(app))
.catch (err) ->
throw err unless (Date.now() - startTime) > timeout
.then ->
fs.unlinkAsync(killmePath(app)).catch(_.noop)
retryCheck = ->
checkFileOrTimeout()
.catch ->
Promise.delay(pollInterval).then(retryCheck)
retryCheck()
updateStrategies =
'download-then-kill': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.try ->
fetch(app, { deltaSource }) if needsDownload
.then ->
Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
utils.getKnexApp(localApp.appId)
.then(kill)
.then ->
start(app)
.catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err
'kill-then-download': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
utils.getKnexApp(localApp.appId)
.then(kill)
.then ->
fetch(app, { deltaSource }) if needsDownload
.then ->
start(app)
.catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err
'delete-then-download': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
utils.getKnexApp(localApp.appId)
.tap(kill)
.then (appFromDB) ->
# If we don't need to download a new image,
# there's no use in deleting the image
if needsDownload
deleteImage(appFromDB)
.then ->
fetch(app, { deltaSource })
.then ->
start(app)
.catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err
'hand-over': ({ localApp, app, needsDownload, force, timeout, deltaSource }) ->
Promise.using lockUpdates(localApp, force), ->
utils.getKnexApp(localApp.appId)
.then (localApp) ->
Promise.try ->
fetch(app, { deltaSource }) if needsDownload
.then ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
start(app)
.then ->
waitToKill(localApp, timeout)
.then ->
kill(localApp, updateDB: false)
.catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err
updateUsingStrategy = (strategy, options) ->
if not _.has(updateStrategies, strategy)
strategy = 'download-then-kill'
updateStrategies[strategy](options)
getRemoteState = (uuid, apiKey) ->
endpoint = url.resolve(config.apiEndpoint, "/device/v1/#{uuid}/state")
requestParams = _.extend
method: 'GET'
url: "#{endpoint}?&apikey=#{apiKey}"
, cachedResinApi.passthrough
cachedResinApi._request(requestParams)
.timeout(config.apiTimeout)
.catch (err) ->
console.error("Failed to get state for device #{uuid}. #{err}")
throw err
# TODO: Actually store and use app.environment and app.config separately
parseEnvAndFormatRemoteApps = (remoteApps, uuid, deviceApiKey) ->
appsWithEnv = _.mapValues remoteApps, (app, appId) ->
utils.extendEnvVars(app.environment, uuid, deviceApiKey, appId, app.name, app.commit)
.then (env) ->
app.config ?= {}
return {
appId
commit: app.commit
imageId: app.image
env: JSON.stringify(env)
config: JSON.stringify(app.config)
name: app.name
markedForDeletion: false
}
Promise.props(appsWithEnv)
formatLocalApps = (apps) ->
apps = _.keyBy(apps, 'appId')
localApps = _.mapValues apps, (app) ->
app = _.pick(app, [ 'appId', 'commit', 'imageId', 'env', 'config', 'name', 'markedForDeletion' ])
app.markedForDeletion = checkTruthy(app.markedForDeletion)
return app
return localApps
restartVars = (conf) ->
return _.pick(conf, [ 'RESIN_DEVICE_RESTART', 'RESIN_RESTART' ])
compareForUpdate = (localApps, remoteApps) ->
remoteAppIds = _.keys(remoteApps)
localAppIds = _.keys(localApps)
toBeRemoved = _.difference(localAppIds, remoteAppIds)
toBeInstalled = _.difference(remoteAppIds, localAppIds)
matchedAppIds = _.intersection(remoteAppIds, localAppIds)
toBeUpdated = _.filter matchedAppIds, (appId) ->
localApp = _.omit(localApps[appId], 'config')
remoteApp = _.omit(remoteApps[appId], 'config')
localApp.env = _.omit(JSON.parse(localApp.env), 'RESIN_DEVICE_NAME_AT_INIT')
remoteApp.env = _.omit(JSON.parse(remoteApp.env), 'RESIN_DEVICE_NAME_AT_INIT')
return !_.isEqual(remoteApp, localApp) or
!_.isEqual(restartVars(JSON.parse(localApps[appId].config)), restartVars(JSON.parse(remoteApps[appId].config)))
appsWithUpdatedConfigs = _.filter matchedAppIds, (appId) ->
return !_.includes(toBeUpdated, appId) and
!_.isEqual(localApps[appId].config, remoteApps[appId].config)
toBeDownloaded = _.filter toBeUpdated, (appId) ->
return !_.isEqual(remoteApps[appId].imageId, localApps[appId].imageId)
toBeDownloaded = _.union(toBeDownloaded, toBeInstalled)
# The order in this _.union ensures that installations (new appIds) go last, so
# they will happen *after* removals.
allAppIds = _.union(localAppIds, remoteAppIds)
return { toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds }
application.update = update = (force, scheduled = false) ->
switch updateStatus.state
when UPDATE_SCHEDULED
if scheduled isnt true
# There's an update scheduled but it isn't this one, so just stop
# but if we have to force an update, do it in the one that is scheduled
updateStatus.forceNext or= force
return
when UPDATE_IDLE
# All good, carry on with the update.
else
# Mark an update required after the current in-progress update.
updateStatus.forceNext or= force
updateStatus.state = UPDATE_REQUIRED
return
force or= updateStatus.forceNext
updateStatus.forceNext = false
updateStatus.state = UPDATE_UPDATING
bootstrap.done.then ->
Promise.join utils.getConfig('apiKey'), utils.getConfig('uuid'), utils.getConfig('name'), knex('app').select(), (apiKey, uuid, deviceName, apps) ->
getRemoteState(uuid, apiKey)
.then ({ local, dependent }) ->
proxyvisor.fetchAndSetTargetsForDependentApps(dependent, fetch, apiKey)
.then ->
utils.setConfig('name', local.name) if local.name != deviceName
.then ->
utils.getConfig('deviceApiKey')
.then (deviceApiKey) ->
parseEnvAndFormatRemoteApps(local.apps, uuid, deviceApiKey)
.then (remoteApps) ->
localApps = formatLocalApps(apps)
resourcesForUpdate = compareForUpdate(localApps, remoteApps)
{ toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds } = resourcesForUpdate
if !_.isEmpty(toBeRemoved) or !_.isEmpty(toBeInstalled) or !_.isEmpty(toBeUpdated)
device.setUpdateState(update_pending: true)
deltaSource = {}
Promise.map toBeDownloaded, (appId) ->
if _.includes(toBeUpdated, appId)
deltaSource[appId] = localApps[appId].imageId
else if !_.isEmpty(apps) # apps is localApps as an array
deltaSource[appId] = apps[0].imageId
.then ->
# Before running the updates, try to clean up any images that aren't in use
# and will not be used in the target state
return if application.localMode or device.shuttingDown
dockerUtils.cleanupContainersAndImages(_.map(remoteApps, 'imageId').concat(_.values(deltaSource)))
.catch (err) ->
console.log('Cleanup failed: ', err, err.stack)
.then ->
remoteDeviceConfig = {}
_.map remoteAppIds, (appId) ->
_.merge(remoteDeviceConfig, JSON.parse(remoteApps[appId].config))
remoteDeviceHostConfig = _.pickBy(remoteDeviceConfig, (val, key) -> _.startsWith(key, device.hostConfigConfigVarPrefix))
remoteSpecialActionValues = _.pickBy(remoteDeviceConfig, (val, key) -> _.includes(specialActionConfigKeys, key))
# This prevents saving values like RESIN_SUPERVISOR_DELTA, that (for now) actually belong to app config, to the deviceConfig table
relevantDeviceConfig = _.assign({}, remoteDeviceHostConfig, remoteSpecialActionValues)
deviceConfig.get()
.then ({ values, targetValues }) ->
# If the new device config is different from the target values we had, or if
# for some reason it hasn't been applied yet (values don't match target), we apply it.
if !_.isEqual(targetValues, remoteDeviceConfig) or !_.isEqual(targetValues, values) or _.some(remoteSpecialActionValues, (v, k) -> executedSpecialActionConfigVars[k] != v)
deviceConfig.set({ targetValues: relevantDeviceConfig })
.then ->
getAndApplyDeviceConfig()
.catch (err) ->
logSystemMessage("Error fetching/applying device configuration: #{err}", { error: err }, 'Set device configuration error')
.return(allAppIds)
.mapSeries (appId) ->
# The sorting of allAppIds from compareForUpdate ensures that toBeRemoved is
# done before toBeInstalled, which is the behavior we want.
return if application.localMode or device.shuttingDown
Promise.try ->
needsDownload = _.includes(toBeDownloaded, appId)
if _.includes(toBeRemoved, appId)
app = localApps[appId]
Promise.using lockUpdates(app, force), ->
# We get the app from the DB again in case someone restarted it
# (which would have changed its containerName)
utils.getKnexApp(appId)
.then(kill)
.then ->
knex('app').where('appId', appId).update(markedForDeletion: true)
.catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err)
throw err
else if _.includes(toBeInstalled, appId)
app = remoteApps[appId]
Promise.try ->
fetch(app, { deltaSource: deltaSource[appId] }) if needsDownload
.then ->
start(app)
else if _.includes(toBeUpdated, appId)
app = remoteApps[appId]
conf = JSON.parse(app.config)
forceThisApp =
checkTruthy(conf['RESIN_SUPERVISOR_OVERRIDE_LOCK']) ||
checkTruthy(conf['RESIN_OVERRIDE_LOCK'])
strategy = conf['RESIN_SUPERVISOR_UPDATE_STRATEGY']
timeout = conf['RESIN_SUPERVISOR_HANDOVER_TIMEOUT']
updateUsingStrategy strategy, {
localApp: localApps[appId]
app
needsDownload
force: force || forceThisApp
timeout
deltaSource: deltaSource[appId]
}
else if _.includes(appsWithUpdatedConfigs, appId)
# These apps have no changes other than config variables.
# It can notoriously affect setting dep. devices hook address
# if nothing else changes in the app.
# So we just save them.
app = remoteApps[appId]
logSystemEvent(logTypes.updateAppConfig, app)
knex('app').update(app).where({ appId })
.then ->
logSystemEvent(logTypes.updateAppConfigSuccess, app)
.catch (err) ->
logSystemEvent(logTypes.updateAppConfigError, app, err)
throw err
.catch(wrapAsError)
.filter(_.isError)
.then (failures) ->
_.each(failures, (err) -> console.error('Error:', err, err.stack))
throw new Error(joinErrorMessages(failures)) if failures.length > 0
.then ->
proxyvisor.sendUpdates() if !device.shuttingDown
.then ->
return if application.localMode or device.shuttingDown
updateStatus.failed = 0
device.setUpdateState(update_pending: false, update_downloaded: false, update_failed: false)
# We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a
# point where we might clean up an image we still want.
knex('app').where(markedForDeletion: true).del()
.then ->
dockerUtils.cleanupContainersAndImages()
.catch (err) ->
updateStatus.failed++
device.setUpdateState(update_failed: true)
if updateStatus.state in [ UPDATE_REQUIRED, UPDATE_SCHEDULED ]
console.log('Updating failed, but there is already another update scheduled immediately: ', err)
return
delayTime = Math.min((2 ** updateStatus.failed) * 500, config.appUpdatePollInterval)
# If there was an error then schedule another attempt briefly in the future.
console.log('Scheduling another update attempt due to failure: ', delayTime, err)
setTimeout(update, delayTime, force, true)
updateStatus.state = UPDATE_SCHEDULED
.finally ->
switch updateStatus.state
when UPDATE_REQUIRED
# If an update is required then schedule it
setTimeout(update, 1, false, true)
updateStatus.state = UPDATE_SCHEDULED
when UPDATE_SCHEDULED
# Already scheduled, nothing to do here
else
updateStatus.state = UPDATE_IDLE
device.updateState(status: 'Idle')
updateStatus.lastFullUpdateCycle = process.hrtime()
updateStatus.timeSpentPulling = 0
return
sanitiseContainerName = (name) -> name.replace(/^\//, '')
listenToEvents = do ->
appHasDied = {}
return ->
docker.getEvents()
.then (stream) ->
stream.on 'error', (err) ->
console.error('Error on docker events stream:', err, err.stack)
parser = JSONStream.parse()
parser.on 'error', (err) ->
console.error('Error on docker events JSON stream:', err, err.stack)
parser.on 'data', (data) ->
if data?.Type? && data.Type == 'container' && data.status in ['die', 'start'] && data.Actor?.Attributes?.Name?
knex('app').select().where({ containerName: sanitiseContainerName(data.Actor.Attributes.Name) })
.then ([ app ]) ->
if app?
if data.status == 'die'
logSystemEvent(logTypes.appExit, app)
appHasDied[app.containerName] = true
else if data.status == 'start' and appHasDied[app.containerName]
logSystemEvent(logTypes.appRestart, app)
logger.attach(app)
.catch (err) ->
console.error('Error on docker event:', err, err.stack)
parser.on 'end', ->
console.error('Docker events stream ended, this should never happen')
listenToEvents()
stream.pipe(parser)
.catch (err) ->
console.error('Error listening to events:', err, err.stack)
migrateContainerIdApps = ->
knex.schema.hasColumn('app', 'containerId')
.then (exists) ->
return if not exists
knex('app').whereNotNull('containerId').select()
.then (apps) ->
return if !apps? or apps.length == 0
Promise.map apps, (app) ->
docker.getContainer(app.containerId).inspect()
.catchReturn({ Name: null })
.then (container) ->
if container.Name?
app.containerName = sanitiseContainerName(container.Name)
else
app.containerName = null
app.containerId = null
knex('app').update(app).where({ id: app.id })
.then ->
knex.schema.table 'app', (table) ->
table.dropColumn('containerId')
application.getContainerId = (app) ->
docker.getContainer(app.containerName).inspect()
.catchReturn({ Id: null })
.then (container) ->
return container.Id
application.initialize = ->
listenToEvents()
migrateContainerIdApps()
.then ->
getAndApplyDeviceConfig(initial: true)
.then ->
knex('app').whereNot(markedForDeletion: true).orWhereNull('markedForDeletion').select()
.map (app) ->
unlockAndStart(app) if !application.localMode and !device.shuttingDown
.catch (error) ->
console.error('Error starting apps:', error)
.then ->
utils.mixpanelTrack('Start application update poll', { interval: config.appUpdatePollInterval })
application.poll()
application.update()
module.exports = (logsChannel, offlineMode) ->
logger.init(
pubnub: config.pubnub
channel: "device-#{logsChannel}-logs"
offlineMode: offlineMode
)
return application

View File

@ -1,275 +0,0 @@
Promise = require 'bluebird'
knex = require './db'
utils = require './utils'
deviceRegister = require 'resin-register-device'
{ resinApi, request } = require './request'
fs = Promise.promisifyAll(require('fs'))
config = require './config'
appsPath = '/boot/apps.json'
_ = require 'lodash'
deviceConfig = require './device-config'
TypedError = require 'typed-error'
osRelease = require './lib/os-release'
semver = require 'semver'
semverRegex = require('semver-regex')
configJson = require './config-json'
DuplicateUuidError = (err) -> _.startsWith(err.message, '"uuid" must be unique')
exports.ExchangeKeyError = class ExchangeKeyError extends TypedError
bootstrapper = {}
loadPreloadedApps = ->
devConfig = {}
knex('app').select()
.then (apps) ->
if apps.length > 0
console.log('Preloaded apps already loaded, skipping')
return
configJson.getAll()
.then (userConfig) ->
fs.readFileAsync(appsPath, 'utf8')
.then(JSON.parse)
.map (app) ->
utils.extendEnvVars(app.env, userConfig.uuid, userConfig.deviceApiKey, app.appId, app.name, app.commit)
.then (extendedEnv) ->
app.env = JSON.stringify(extendedEnv)
app.markedForDeletion = false
_.merge(devConfig, app.config)
app.config = JSON.stringify(app.config)
knex('app').insert(app)
.then ->
deviceConfig.set({ targetValues: devConfig })
.catch (err) ->
utils.mixpanelTrack('Loading preloaded apps failed', { error: err })
fetchDevice = (uuid, apiKey) ->
resinApi.get
resource: 'device'
options:
filter:
uuid: uuid
customOptions:
apikey: apiKey
.get(0)
.catchReturn(null)
.timeout(config.apiTimeout)
exchangeKey = ->
configJson.getAll()
.then (userConfig) ->
Promise.try ->
# If we have an existing device key we first check if it's valid, because if it is we can just use that
if userConfig.deviceApiKey?
fetchDevice(userConfig.uuid, userConfig.deviceApiKey)
.then (device) ->
if device?
return device
# If it's not valid/doesn't exist then we try to use the user/provisioning api key for the exchange
fetchDevice(userConfig.uuid, userConfig.apiKey)
.then (device) ->
if not device?
throw new ExchangeKeyError("Couldn't fetch device with provisioning key")
# We found the device, we can try to register a working device key for it
Promise.try ->
if !userConfig.deviceApiKey?
deviceApiKey = deviceRegister.generateUniqueKey()
configJson.set({ deviceApiKey })
.return(deviceApiKey)
else
return userConfig.deviceApiKey
.then (deviceApiKey) ->
request.postAsync("#{config.apiEndpoint}/api-key/device/#{device.id}/device-key?apikey=#{userConfig.apiKey}", {
json: true
body:
apiKey: deviceApiKey
})
.spread (res, body) ->
if res.statusCode != 200
throw new ExchangeKeyError("Couldn't register device key with provisioning key")
.return(device)
bootstrap = ->
configJson.get('deviceType')
.then (deviceType) ->
if !deviceType?
configJson.set(deviceType: 'raspberry-pi')
.then ->
configJson.getAll()
.then (userConfig) ->
if userConfig.registered_at?
return userConfig
deviceRegister.register(
userId: userConfig.userId
applicationId: userConfig.applicationId
uuid: userConfig.uuid
deviceType: userConfig.deviceType
deviceApiKey: userConfig.deviceApiKey
provisioningApiKey: userConfig.apiKey
apiEndpoint: config.apiEndpoint
)
.timeout(config.apiTimeout)
.catch DuplicateUuidError, ->
console.log('UUID already registered, trying a key exchange')
exchangeKey()
.tap ->
console.log('Key exchange succeeded, all good')
.tapCatch ExchangeKeyError, (err) ->
# If it fails we just have to reregister as a provisioning key doesn't have the ability to change existing devices
console.log('Exchanging key failed, having to reregister')
generateRegistration(true)
.then ({ id }) ->
toUpdate = {}
toDelete = []
if !userConfig.registered_at?
toUpdate.registered_at = Date.now()
toUpdate.deviceId = id
osRelease.getOSVersion(config.hostOSVersionPath)
.then (osVersion) ->
# Delete the provisioning key now, only if the OS supports it
hasSupport = hasDeviceApiKeySupport(osVersion)
if hasSupport
toDelete.push('apiKey')
else
toUpdate.apiKey = userConfig.deviceApiKey
configJson.set(toUpdate, toDelete)
.then ->
configJson.getAll()
.then (userConfig) ->
console.log('Finishing bootstrapping')
knex('config').whereIn('key', ['uuid', 'apiKey', 'username', 'userId', 'version']).delete()
.then ->
knex('config').insert([
{ key: 'uuid', value: userConfig.uuid }
# We use the provisioning/user `apiKey` if it still exists because if it does it means we were already registered
# using that key and have to rely on the exchange key mechanism to swap the keys as appropriate later
{ key: 'apiKey', value: userConfig.apiKey ? userConfig.deviceApiKey }
{ key: 'deviceApiKey', value: userConfig.deviceApiKey }
{ key: 'username', value: userConfig.username }
{ key: 'userId', value: userConfig.userId }
{ key: 'version', value: utils.supervisorVersion }
])
.tap ->
bootstrapper.doneBootstrapping()
generateRegistration = (forceReregister = false) ->
Promise.try ->
if forceReregister
configJson.set({ uuid: deviceRegister.generateUniqueKey(), deviceApiKey: deviceRegister.generateUniqueKey() })
else
configJson.getAll()
.then ({ uuid, deviceApiKey }) ->
uuid ?= deviceRegister.generateUniqueKey()
deviceApiKey ?= deviceRegister.generateUniqueKey()
configJson.set({ uuid, deviceApiKey })
.then ->
configJson.get('uuid')
.catch (err) ->
console.log('Error generating and saving UUID: ', err)
Promise.delay(config.bootstrapRetryDelay)
.then ->
generateRegistration()
bootstrapOrRetry = ->
utils.mixpanelTrack('Device bootstrap')
# If we're in offline mode, we don't start the provisioning process so bootstrap.done will never fulfill
return if bootstrapper.offlineMode
bootstrap().catch (err) ->
utils.mixpanelTrack('Device bootstrap failed, retrying', { error: err, delay: config.bootstrapRetryDelay })
setTimeout(bootstrapOrRetry, config.bootstrapRetryDelay)
hasDeviceApiKeySupport = (osVersion) ->
try
!/^Resin OS /.test(osVersion) or semver.gte(semverRegex().exec(osVersion)[0], '2.0.2')
catch err
console.error('Unable to determine if device has deviceApiKey support', err, err.stack)
false
exchangeKeyAndUpdateConfig = ->
# Only do a key exchange and delete the provisioning key if we're on a Resin OS version
# that supports using the deviceApiKey (2.0.2 and above)
# or if we're in a non-Resin OS (which is assumed to be updated enough).
# Otherwise VPN and other host services that use an API key will break.
#
# In other cases, we make the apiKey equal the deviceApiKey instead.
Promise.join(
configJson.getAll()
osRelease.getOSVersion(config.hostOSVersionPath)
(userConfig, osVersion) ->
hasSupport = hasDeviceApiKeySupport(osVersion)
if hasSupport or userConfig.apiKey != userConfig.deviceApiKey
console.log('Attempting key exchange')
exchangeKey()
.then ->
configJson.get('deviceApiKey')
.then (deviceApiKey) ->
console.log('Key exchange succeeded, starting to use deviceApiKey')
utils.setConfig('deviceApiKey', deviceApiKey)
.then ->
utils.setConfig('apiKey', deviceApiKey)
.then ->
if hasSupport
configJson.set({}, [ 'apiKey' ])
else
configJson.set(apiKey: deviceApiKey)
)
exchangeKeyOrRetry = do ->
_failedExchanges = 0
return ->
exchangeKeyAndUpdateConfig()
.catch (err) ->
console.error('Error exchanging API key, will retry', err, err.stack)
delay = Math.min((2 ** _failedExchanges) * config.bootstrapRetryDelay, 24 * 60 * 60 * 1000)
_failedExchanges += 1
setTimeout(exchangeKeyOrRetry, delay)
return
bootstrapper.done = new Promise (resolve) ->
bootstrapper.doneBootstrapping = ->
configJson.getAll()
.then (userConfig) ->
bootstrapper.bootstrapped = true
resolve(userConfig)
# If we're still using an old api key we can try to exchange it for a valid device key
# This will only be the case when the supervisor/OS has been updated.
if userConfig.apiKey?
exchangeKeyOrRetry()
else
Promise.join(
knex('config').select('value').where(key: 'apiKey')
knex('config').select('value').where(key: 'deviceApiKey')
([ apiKey ], [ deviceApiKey ]) ->
if !deviceApiKey?.value
# apiKey in the DB is actually the deviceApiKey, but it was
# exchanged in a supervisor version that didn't save it to the DB
# (which mainly affects the RESIN_API_KEY env var)
knex('config').insert({ key: 'deviceApiKey', value: apiKey.value })
)
return
bootstrapper.bootstrapped = false
bootstrapper.startBootstrapping = ->
# Load config file
configJson.init()
.then ->
configJson.getAll()
.then (userConfig) ->
bootstrapper.offlineMode = !Boolean(config.apiEndpoint) or Boolean(userConfig.supervisorOfflineMode)
knex('config').select('value').where(key: 'uuid')
.then ([ uuid ]) ->
if uuid?.value
bootstrapper.doneBootstrapping() if !bootstrapper.offlineMode
return uuid.value
console.log('New device detected. Bootstrapping..')
generateRegistration()
.tap ->
loadPreloadedApps()
.tap (uuid) ->
bootstrapOrRetry() if !bootstrapper.offlineMode
# Don't wait on bootstrapping here, bootstrapper.done is for that.
return
module.exports = bootstrapper

View File

@ -1,46 +0,0 @@
Promise = require 'bluebird'
_ = require 'lodash'
configPath = '/boot/config.json'
Lock = require 'rwlock'
fs = Promise.promisifyAll(require('fs'))
{ writeAndSyncFile } = require './lib/fs-utils'
lock = new Lock()
writeLock = Promise.promisify(lock.async.writeLock)
readLock = Promise.promisify(lock.async.readLock)
withWriteLock = do ->
_takeLock = ->
writeLock('config')
.disposer (release) ->
release()
return (func) ->
Promise.using(_takeLock(), func)
withReadLock = do ->
_takeLock = ->
readLock('config')
.disposer (release) ->
release()
return (func) ->
Promise.using(_takeLock(), func)
# write-through cache of the config.json file
userConfig = null
exports.init = ->
fs.readFileAsync(configPath)
.then (conf) ->
userConfig = JSON.parse(conf)
exports.get = (key) ->
withReadLock ->
return userConfig[key]
exports.getAll = ->
withReadLock ->
return _.clone(userConfig)
exports.set = (vals = {}, keysToDelete = []) ->
withWriteLock ->
_.merge(userConfig, vals)
for key in keysToDelete
delete userConfig[key]
writeAndSyncFile(configPath, JSON.stringify(userConfig))

View File

@ -1,271 +0,0 @@
_ = require 'lodash'
Promise = require 'bluebird'
memoizee = require 'memoizee'
utils = require './utils'
{ resinApi } = require './request'
device = exports
config = require './config'
configPath = '/boot/config.json'
execAsync = Promise.promisify(require('child_process').exec)
fs = Promise.promisifyAll(require('fs'))
bootstrap = require './bootstrap'
{ checkTruthy } = require './lib/validation'
osRelease = require './lib/os-release'
EventEmitter = require 'events'
# If we don't use promise: 'then', exceptions will crash the program
memoizePromise = _.partial(memoizee, _, promise: 'then')
exports.getID = memoizePromise ->
bootstrap.done
.then ->
Promise.all([
utils.getConfig('apiKey')
utils.getConfig('uuid')
])
.spread (apiKey, uuid) ->
resinApi.get(
resource: 'device'
options:
select: 'id'
filter:
uuid: uuid
customOptions:
apikey: apiKey
)
.timeout(config.apiTimeout)
.then (devices) ->
if devices.length is 0
throw new Error('Could not find this device?!')
return devices[0].id
exports.shuttingDown = false
exports.events = new EventEmitter()
exports.reboot = ->
utils.gosuper.postAsync('/v1/reboot', { json: true })
.spread (res, body) ->
if res.statusCode != 202
throw new Error(body.Error)
exports.shuttingDown = true
exports.events.emit('shutdown')
return body
exports.shutdown = ->
utils.gosuper.postAsync('/v1/shutdown', { json: true })
.spread (res, body) ->
if res.statusCode != 202
throw new Error(body.Error)
exports.shuttingDown = true
exports.events.emit('shutdown')
return body
exports.hostConfigConfigVarPrefix = 'RESIN_HOST_'
bootConfigEnvVarPrefix = 'RESIN_HOST_CONFIG_'
bootBlockDevice = '/dev/mmcblk0p1'
bootMountPoint = '/mnt/root' + config.bootMountPoint
bootConfigPath = bootMountPoint + '/config.txt'
configRegex = new RegExp('(' + _.escapeRegExp(bootConfigEnvVarPrefix) + ')(.+)')
forbiddenConfigKeys = [
'disable_commandline_tags'
'cmdline'
'kernel'
'kernel_address'
'kernel_old'
'ramfsfile'
'ramfsaddr'
'initramfs'
'device_tree_address'
'init_uart_baud'
'init_uart_clock'
'init_emmc_clock'
'boot_delay'
'boot_delay_ms'
'avoid_safe_mode'
]
parseBootConfigFromEnv = (env) ->
# We ensure env doesn't have garbage
parsedEnv = _.pickBy env, (val, key) ->
return _.startsWith(key, bootConfigEnvVarPrefix)
parsedEnv = _.mapKeys parsedEnv, (val, key) ->
key.replace(configRegex, '$2')
parsedEnv = _.omit(parsedEnv, forbiddenConfigKeys)
return parsedEnv
exports.setHostConfig = (env, oldEnv, logMessage) ->
Promise.join setBootConfig(env, oldEnv, logMessage), setLogToDisplay(env, oldEnv, logMessage), (bootConfigApplied, logToDisplayChanged) ->
return (bootConfigApplied or logToDisplayChanged)
setLogToDisplay = (env, oldEnv, logMessage) ->
if env['RESIN_HOST_LOG_TO_DISPLAY']?
enable = checkTruthy(env['RESIN_HOST_LOG_TO_DISPLAY']) ? true
utils.gosuper.postAsync('/v1/set-log-to-display', { json: true, body: Enable: enable })
.spread (response, body) ->
if response.statusCode != 200
logMessage("Error setting log to display: #{body.Error}, Status:, #{response.statusCode}", { error: body.Error }, 'Set log to display error')
return false
else
if body.Data == true
logMessage("#{if enable then 'Enabled' else 'Disabled'} logs to display")
return body.Data
.catch (err) ->
logMessage("Error setting log to display: #{err}", { error: err }, 'Set log to display error')
return false
else
return Promise.resolve(false)
setBootConfig = (env, oldEnv, logMessage) ->
device.getDeviceType()
.then (deviceType) ->
throw new Error('This is not a Raspberry Pi') if !_.startsWith(deviceType, 'raspberry')
Promise.join parseBootConfigFromEnv(env), parseBootConfigFromEnv(oldEnv), fs.readFileAsync(bootConfigPath, 'utf8'), (configFromApp, oldConfigFromApp, configTxt ) ->
throw new Error('No boot config to change') if _.isEqual(configFromApp, oldConfigFromApp)
configFromFS = {}
configPositions = []
configStatements = configTxt.split(/\r?\n/)
_.each configStatements, (configStr) ->
keyValue = /^([^#=]+)=(.+)/.exec(configStr)
if keyValue?
configPositions.push(keyValue[1])
configFromFS[keyValue[1]] = keyValue[2]
else
# This will ensure config.txt filters are in order
configPositions.push(configStr)
# configFromApp and configFromFS now have compatible formats
keysFromApp = _.keys(configFromApp)
keysFromOldConf = _.keys(oldConfigFromApp)
keysFromFS = _.keys(configFromFS)
toBeAdded = _.difference(keysFromApp, keysFromFS)
toBeDeleted = _.difference(keysFromOldConf, keysFromApp)
toBeChanged = _.intersection(keysFromApp, keysFromFS)
toBeChanged = _.filter toBeChanged, (key) ->
configFromApp[key] != configFromFS[key]
throw new Error('Nothing to change') if _.isEmpty(toBeChanged) and _.isEmpty(toBeAdded) and _.isEmpty(toBeDeleted)
logMessage("Applying boot config: #{JSON.stringify(configFromApp)}", {}, 'Apply boot config in progress')
# We add the keys to be added first so they are out of any filters
outputConfig = _.map toBeAdded, (key) -> "#{key}=#{configFromApp[key]}"
outputConfig = outputConfig.concat _.map configPositions, (key, index) ->
configStatement = null
if _.includes(toBeChanged, key)
configStatement = "#{key}=#{configFromApp[key]}"
else if !_.includes(toBeDeleted, key)
configStatement = configStatements[index]
return configStatement
# Here's the dangerous part:
execAsync("mount -t vfat -o remount,rw #{bootBlockDevice} #{bootMountPoint}")
.then ->
fs.writeFileAsync(bootConfigPath + '.new', _.reject(outputConfig, _.isNil).join('\n'))
.then ->
fs.renameAsync(bootConfigPath + '.new', bootConfigPath)
.then ->
execAsync('sync')
.then ->
logMessage("Applied boot config: #{JSON.stringify(configFromApp)}", {}, 'Apply boot config success')
return true
.catch (err) ->
logMessage("Error setting boot config: #{err}", { error: err }, 'Apply boot config error')
throw err
.catch (err) ->
console.log('Will not set boot config: ', err)
return false
exports.getDeviceType = memoizePromise ->
fs.readFileAsync(configPath, 'utf8')
.then(JSON.parse)
.then (configFromFile) ->
if !configFromFile.deviceType?
throw new Error('Device type not specified in config file')
return configFromFile.deviceType
do ->
APPLY_STATE_SUCCESS_DELAY = 1000
APPLY_STATE_RETRY_DELAY = 5000
applyPending = false
targetState = {}
actualState = {}
updateState = { update_pending: false, update_failed: false, update_downloaded: false }
reportErrors = 0
exports.stateReportHealthy = ->
return !(utils.isConnectivityCheckEnabled() and utils.connected() and reportErrors > 3)
getStateDiff = ->
_.omitBy targetState, (value, key) ->
actualState[key] is value
applyState = ->
stateDiff = getStateDiff()
if _.size(stateDiff) is 0
applyPending = false
return
applyPending = true
Promise.join(
utils.getConfig('apiKey')
device.getID()
(apiKey, deviceID) ->
stateDiff = getStateDiff()
if _.size(stateDiff) is 0 || !apiKey?
return
resinApi.patch
resource: 'device'
id: deviceID
body: stateDiff
customOptions:
apikey: apiKey
.timeout(config.apiTimeout)
.then ->
reportErrors = 0
# Update the actual state.
_.merge(actualState, stateDiff)
)
.delay(APPLY_STATE_SUCCESS_DELAY)
.catch (error) ->
reportErrors += 1
utils.mixpanelTrack('Device info update failure', { error, stateDiff })
# Delay 5s before retrying a failed update
Promise.delay(APPLY_STATE_RETRY_DELAY)
.finally ->
# Check if any more state diffs have appeared whilst we've been processing this update.
setImmediate(applyState)
exports.setUpdateState = (value) ->
_.merge(updateState, value)
exports.getState = ->
fieldsToOmit = ['api_secret', 'logs_channel', 'provisioning_progress', 'provisioning_state']
state = _.omit(targetState, fieldsToOmit)
_.merge(state, updateState)
return state
# Calling this function updates the local device state, which is then used to synchronise
# the remote device state, repeating any failed updates until successfully synchronised.
# This function will also optimise updates by merging multiple updates and only sending the latest state.
exports.updateState = (updatedState = {}, retry = false) ->
# Remove any updates that match the last we successfully sent.
_.merge(targetState, updatedState)
# Only trigger applying state if an apply isn't already in progress.
if !applyPending
applyState()
return
exports.getOSVersion = memoizePromise ->
osRelease.getOSVersion(config.hostOSVersionPath)
exports.isResinOSv1 = memoizePromise ->
exports.getOSVersion().then (osVersion) ->
return true if /^Resin OS 1./.test(osVersion)
return false
exports.getOSVariant = memoizePromise ->
osRelease.getOSVariant(config.hostOSVersionPath)
do ->
_gosuperHealthy = true
exports.gosuperHealthy = ->
return _gosuperHealthy
exports.reportUnhealthyGosuper = ->
_gosuperHealthy = false
exports.reportHealthyGosuper = ->
_gosuperHealthy = true

View File

@ -1,223 +0,0 @@
config = require './config'
process.env.DOCKER_HOST ?= "unix://#{config.dockerSocket}"
Docker = require 'docker-toolbelt'
{ DockerProgress } = require 'docker-progress'
Promise = require 'bluebird'
dockerDelta = require 'docker-delta'
_ = require 'lodash'
knex = require './db'
{ request, resumable } = require './request'
Lock = require 'rwlock'
exports.docker = docker = new Docker()
dockerProgress = new DockerProgress(dockerToolbelt: docker)
getRepoAndTag = (image) ->
docker.getRegistryAndName(image)
.then ({ registry, imageName, tagName }) ->
if registry? and registry != 'docker.io'
registry = registry.toString().replace(':443', '') + '/'
else
registry = ''
return { repo: "#{registry}#{imageName}", tag: tagName }
applyDelta = (imgSrc, deltaUrl, applyTimeout, opts, onProgress) ->
new Promise (resolve, reject) ->
req = resumable(Object.assign({ url: deltaUrl }, opts))
.on('progress', onProgress)
.on('retry', onProgress)
.on('error', reject)
.on 'response', (res) ->
if res.statusCode isnt 200
reject(new Error("Got #{res.statusCode} when requesting delta from storage."))
else if parseInt(res.headers['content-length']) is 0
reject(new Error('Invalid delta URL.'))
else
deltaStream = dockerDelta.applyDelta(imgSrc, timeout: applyTimeout)
res.pipe(deltaStream)
.on('id', resolve)
.on('error', req.abort.bind(req))
do ->
_lock = new Lock()
_writeLock = Promise.promisify(_lock.async.writeLock)
_readLock = Promise.promisify(_lock.async.readLock)
writeLockImages = ->
_writeLock('images')
.disposer (release) ->
release()
readLockImages = ->
_readLock('images')
.disposer (release) ->
release()
exports.rsyncImageWithProgress = (imgDest, opts, onProgress) ->
{ requestTimeout, applyTimeout, retryCount, retryInterval, uuid, apiKey, deltaSource, startFromEmpty = false } = opts
Promise.using readLockImages(), ->
Promise.try ->
if startFromEmpty or !deltaSource?
return 'resin/scratch'
else
docker.getImage(deltaSource).inspect()
.then ->
return deltaSource
.catch ->
return 'resin/scratch'
.then (imgSrc) ->
# I'll leave this debug log here in case we ever wonder what delta source a device is using in production
console.log("Using delta source #{imgSrc}")
Promise.join docker.getRegistryAndName(imgDest), docker.getRegistryAndName(imgSrc), (dstInfo, srcInfo) ->
tokenEndpoint = "#{config.apiEndpoint}/auth/v1/token"
opts =
auth:
user: 'd_' + uuid
pass: apiKey
sendImmediately: true
json: true
timeout: requestTimeout
url = "#{tokenEndpoint}?service=#{dstInfo.registry}&scope=repository:#{dstInfo.imageName}:pull&scope=repository:#{srcInfo.imageName}:pull"
request.getAsync(url, opts)
.get(1)
.then (b) ->
opts =
followRedirect: false
timeout: requestTimeout
if b?.token?
opts.auth =
bearer: b.token
sendImmediately: true
new Promise (resolve, reject) ->
request.get("#{config.deltaHost}/api/v2/delta?src=#{imgSrc}&dest=#{imgDest}", opts)
.on 'response', (res) ->
res.resume() # discard response body -- we only care about response headers
if res.statusCode in [ 502, 504 ]
reject(new Error('Delta server is still processing the delta, will retry'))
else if not (300 <= res.statusCode < 400 and res.headers['location']?)
reject(new Error("Got #{res.statusCode} when requesting image from delta server."))
else
deltaUrl = res.headers['location']
if imgSrc is 'resin/scratch'
deltaSrc = null
else
deltaSrc = imgSrc
resumeOpts = { timeout: requestTimeout, maxRetries: retryCount, retryInterval }
resolve(applyDelta(deltaSrc, deltaUrl, applyTimeout, resumeOpts, onProgress))
.on 'error', reject
.then (id) ->
getRepoAndTag(imgDest)
.then ({ repo, tag }) ->
docker.getImage(id).tag({ repo, tag, force: true })
.catch dockerDelta.OutOfSyncError, (err) ->
throw err if startFromEmpty
console.log('Falling back to delta-from-empty')
opts.startFromEmpty = true
exports.rsyncImageWithProgress(imgDest, opts, onProgress)
exports.fetchImageWithProgress = (image, onProgress, { uuid, apiKey }) ->
Promise.using readLockImages(), ->
docker.getRegistryAndName(image)
.then ({ registry, imageName, tagName }) ->
dockerOptions =
authconfig:
username: 'd_' + uuid,
password: apiKey,
serveraddress: registry
dockerProgress.pull(image, onProgress, dockerOptions)
normalizeRepoTag = (image) ->
getRepoAndTag(image)
.then ({ repo, tag }) ->
buildRepoTag(repo, tag)
supervisorTagPromise = normalizeRepoTag(config.supervisorImage)
exports.cleanupContainersAndImages = (extraImagesToIgnore = []) ->
Promise.using writeLockImages(), ->
Promise.join(
knex('app').select()
.map (app) ->
app.imageId = normalizeRepoTag(app.imageId)
return Promise.props(app)
knex('dependentApp').select().whereNotNull('imageId')
.map ({ imageId }) ->
return normalizeRepoTag(imageId)
supervisorTagPromise
docker.listImages()
.map (image) ->
image.NormalizedRepoTags = Promise.map(image.RepoTags, normalizeRepoTag)
return Promise.props(image)
Promise.map(extraImagesToIgnore, normalizeRepoTag)
(apps, dependentApps, supervisorTag, images, normalizedExtraImages) ->
appNames = _.map(apps, 'containerName')
appImages = _.map(apps, 'imageId')
imageTags = _.map(images, 'NormalizedRepoTags')
appTags = _.filter imageTags, (tags) ->
_.some tags, (tag) ->
_.includes(appImages, tag) or _.includes(dependentApps, tag)
appTags = _.flatten(appTags)
supervisorTags = _.filter imageTags, (tags) ->
_.includes(tags, supervisorTag)
supervisorTags = _.flatten(supervisorTags)
extraTags = _.filter imageTags, (tags) ->
_.some(tags, (tag) -> _.includes(normalizedExtraImages, tag))
extraTags = _.flatten(extraTags)
allProtectedTags = _.union(appTags, supervisorTags, extraTags)
# Cleanup containers first, so that they don't block image removal.
docker.listContainers(all: true)
.filter (containerInfo) ->
# Do not remove user apps.
normalizeRepoTag(containerInfo.Image)
.then (repoTag) ->
if _.includes(appTags, repoTag)
return !_.some containerInfo.Names, (name) ->
_.some appNames, (appContainerName) -> "/#{appContainerName}" == name
if _.includes(supervisorTags, repoTag)
return containerHasExited(containerInfo.Id)
return true
.map (containerInfo) ->
docker.getContainer(containerInfo.Id).remove(v: true, force: true)
.then ->
console.log('Deleted container:', containerInfo.Id, containerInfo.Image)
.catch(_.noop)
.then ->
imagesToClean = _.reject images, (image) ->
_.some(image.NormalizedRepoTags, (tag) -> _.includes(allProtectedTags, tag))
Promise.map imagesToClean, (image) ->
Promise.map image.RepoTags.concat(image.Id), (tag) ->
docker.getImage(tag).remove(force: true)
.then ->
console.log('Deleted image:', tag, image.Id, image.RepoTags)
.catch(_.noop)
)
containerHasExited = (id) ->
docker.getContainer(id).inspect()
.then (data) ->
return not data.State.Running
buildRepoTag = (repo, tag, registry) ->
repoTag = ''
if registry?
repoTag += registry + '/'
repoTag += repo
if tag?
repoTag += ':' + tag
else
repoTag += ':latest'
return repoTag
exports.getImageEnv = (id) ->
docker.getImage(id).inspect()
.get('Config').get('Env')
.then (env) ->
# env is an array of strings that say 'key=value'
_(env)
.invokeMap('split', '=')
.fromPairs()
.value()
.catch (err) ->
console.log('Error getting env from image', err, err.stack)
return {}

View File

@ -1,106 +0,0 @@
_ = require 'lodash'
Docker = require 'docker-toolbelt'
PUBNUB = require 'pubnub'
Promise = require 'bluebird'
es = require 'event-stream'
Lock = require 'rwlock'
{ docker } = require '../docker-utils'
LOG_PUBLISH_INTERVAL = 110
# Pubnub's message size limit is 32KB (unclear on whether it's KB or actually KiB,
# but we'll be conservative). So we limit a log message to 2 bytes less to account
# for the [ and ] in the array.
MAX_LOG_BYTE_SIZE = 30000
MAX_MESSAGE_INDEX = 9
disableLogs = false
initialised = new Promise (resolve) ->
exports.init = (config) ->
resolve(config)
# Queue up any calls to publish logs whilst we wait to be initialised.
publish = do ->
publishQueue = [[]]
messageIndex = 0
publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE
logsOverflow = false
initialised.then (config) ->
if config.offlineMode
publish = _.noop
publishQueue = null
return
pubnub = PUBNUB.init(config.pubnub)
channel = config.channel
doPublish = ->
return if publishQueue[0].length is 0
message = publishQueue.shift()
pubnub.publish({ channel, message })
if publishQueue.length is 0
publishQueue = [[]]
publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE
messageIndex = Math.max(messageIndex - 1, 0)
logsOverflow = false if messageIndex < MAX_MESSAGE_INDEX
setInterval(doPublish, LOG_PUBLISH_INTERVAL)
return (message) ->
# Disable sending logs for bandwidth control
return if disableLogs or (messageIndex >= MAX_MESSAGE_INDEX and publishQueueRemainingBytes <= 0)
if _.isString(message)
message = { m: message }
_.defaults message,
t: Date.now()
m: ''
msgLength = Buffer.byteLength(encodeURIComponent(JSON.stringify(message)), 'utf8')
return if msgLength > MAX_LOG_BYTE_SIZE # Unlikely, but we can't allow this
remaining = publishQueueRemainingBytes - msgLength
if remaining >= 0
publishQueue[messageIndex].push(message)
publishQueueRemainingBytes = remaining
else if messageIndex < MAX_MESSAGE_INDEX
messageIndex += 1
publishQueue[messageIndex] = [ message ]
publishQueueRemainingBytes = MAX_LOG_BYTE_SIZE - msgLength
else if !logsOverflow
logsOverflow = true
messageIndex += 1
publishQueue[messageIndex] = [ { m: 'Warning! Some logs dropped due to high load', t: Date.now(), s: 1 } ]
publishQueueRemainingBytes = 0
# disable: A Boolean to pause the Log Publishing - Logs are lost when paused.
exports.disableLogPublishing = (disable) ->
disableLogs = disable
exports.log = ->
publish(arguments...)
do ->
_lock = new Lock()
_writeLock = Promise.promisify(_lock.async.writeLock)
loggerLock = (containerName) ->
_writeLock(containerName)
.disposer (release) ->
release()
attached = {}
exports.attach = (app) ->
Promise.using loggerLock(app.containerName), ->
if !attached[app.containerName]
docker.getContainer(app.containerName)
.logs({ follow: true, stdout: true, stderr: true, timestamps: true })
.then (stream) ->
attached[app.containerName] = true
stream.pipe(es.split())
.on 'data', (logLine) ->
space = logLine.indexOf(' ')
if space > 0
msg = { t: logLine.substr(0, space), m: logLine.substr(space + 1) }
publish(msg)
.on 'error', (err) ->
console.error('Error on container logs', err, err.stack)
attached[app.containerName] = false
.on 'end', ->
attached[app.containerName] = false

View File

@ -1,9 +0,0 @@
Promise = require('bluebird')
crypto = require('crypto')
exports.generate = (length = 32, callback) ->
Promise.try ->
crypto.randomBytes(length).toString('hex')
.catch ->
Promise.delay(1).then(exports.generate)
.nodeify(callback)

View File

@ -1,52 +0,0 @@
config = require './config'
PlatformAPI = require 'pinejs-client'
Promise = require 'bluebird'
request = require 'request'
resumable = require 'resumable-request'
url = require 'url'
osRelease = require './lib/os-release'
osVersion = osRelease.getOSVersionSync(config.hostOSVersionPath)
osVariant = osRelease.getOSVariantSync(config.hostOSVersionPath)
supervisorVersion = require('./lib/supervisor-version')
userAgent = "Supervisor/#{supervisorVersion}"
if osVersion?
if osVariant?
userAgent += " (Linux; #{osVersion}; #{osVariant})"
else
userAgent += " (Linux; #{osVersion})"
# With these settings, the device must be unable to receive a single byte
# from the network for a continuous period of 20 minutes before we give up.
# (reqTimeout + retryInterval) * retryCount / 1000ms / 60sec ~> minutes
DEFAULT_REQUEST_TIMEOUT = 30000 # ms
DEFAULT_REQUEST_RETRY_INTERVAL = 10000 # ms
DEFAULT_REQUEST_RETRY_COUNT = 30
requestOpts =
gzip: true
timeout: DEFAULT_REQUEST_TIMEOUT
headers:
'User-Agent': userAgent
resumableOpts =
timeout: DEFAULT_REQUEST_TIMEOUT
maxRetries: DEFAULT_REQUEST_RETRY_COUNT
retryInterval: DEFAULT_REQUEST_RETRY_INTERVAL
try
PLATFORM_ENDPOINT = url.resolve(config.apiEndpoint, '/v2/')
exports.resinApi = resinApi = new PlatformAPI
apiPrefix: PLATFORM_ENDPOINT
passthrough: requestOpts
exports.cachedResinApi = resinApi.clone({}, cache: {})
catch
exports.resinApi = {}
exports.cachedResinApi = {}
request = request.defaults(requestOpts)
exports.request = Promise.promisifyAll(request, multiArgs: true)
exports.resumable = resumable.defaults(resumableOpts)

View File

@ -1,349 +0,0 @@
Promise = require 'bluebird'
_ = require 'lodash'
fs = Promise.promisifyAll require 'fs'
config = require './config'
knex = require './db'
mixpanel = require 'mixpanel'
networkCheck = require 'network-checker'
blink = require('blinking')(config.ledFile)
url = require 'url'
randomHexString = require './lib/random-hex-string'
{ request } = require './request'
logger = require './lib/logger'
TypedError = require 'typed-error'
execAsync = Promise.promisify(require('child_process').exec)
device = require './device'
{ checkTruthy } = require './lib/validation'
mask = require 'json-mask'
exports.supervisorVersion = require('./lib/supervisor-version')
configJson = JSON.parse(fs.readFileSync('/boot/config.json'))
if Boolean(config.apiEndpoint) and !Boolean(configJson.supervisorOfflineMode)
mixpanelClient = mixpanel.init(config.mixpanelToken, { host: config.mixpanelHost })
else
mixpanelClient = { track: _.noop }
exports.mixpanelProperties = mixpanelProperties =
username: configJson.username
mixpanelMask = [
'appId'
'delay'
'error'
'interval'
'app(appId,imageId,commit,name)'
'stateDiff(status,download_progress,commit,os_version,superisor_version,ip_address)'
].join(',')
exports.mixpanelTrack = (event, properties = {}) ->
# Allow passing in an error directly and having it assigned to the error property.
if properties instanceof Error
properties = error: properties
# If the properties has an error argument that is an Error object then it treats it nicely,
# rather than letting it become `{}`
if properties.error instanceof Error
properties.error =
message: properties.error.message
stack: properties.error.stack
properties = _.cloneDeep(properties)
# Filter properties to only send the whitelisted keys and values
properties = mask(properties, mixpanelMask)
console.log('Event:', event, JSON.stringify(properties))
# Mutation is bad, and it should feel bad
properties = _.assign(properties, mixpanelProperties)
mixpanelClient.track(event, properties)
networkPattern =
blinks: 4
pause: 1000
exports.blink = blink
pauseConnectivityCheck = false
disableConnectivityCheck = false
# options: An object of net.connect options, with the addition of:
# timeout: 10s
checkHost = (options) ->
if !isConnectivityCheckEnabled()
return true
else
return networkCheck.checkHost(options)
exports.isConnectivityCheckEnabled = isConnectivityCheckEnabled = ->
return !disableConnectivityCheck and !pauseConnectivityCheck
# Custom monitor that uses checkHost function above.
customMonitor = (options, fn) ->
networkCheck.monitor(checkHost, options, fn)
# pause: A Boolean to pause the connectivity checks
exports.pauseCheck = (pause) ->
pauseConnectivityCheck = pause
# disable: A Boolean to disable the connectivity checks
exports.disableCheck = disableCheck = (disable) ->
disableConnectivityCheck = disable
# Call back for inotify triggered when the VPN status is changed.
vpnStatusInotifyCallback = ->
fs.lstatAsync(config.vpnStatusPath + '/active')
.then ->
pauseConnectivityCheck = true
.catch ->
pauseConnectivityCheck = false
# Use the following to catch EEXIST errors
EEXIST = (err) -> err.code is 'EEXIST'
do ->
_connected = true
exports.connected = ->
return _connected
exports.connectivityCheck = _.once ->
if !config.apiEndpoint?
console.log('No apiEndpoint specified, skipping connectivity check')
return
parsedUrl = url.parse(config.apiEndpoint)
fs.mkdirAsync(config.vpnStatusPath)
.catch EEXIST, (err) ->
console.log('VPN status path exists.')
.then ->
fs.watch(config.vpnStatusPath, vpnStatusInotifyCallback)
# Manually trigger the call back to detect cases when VPN was switched on before the supervisor starts.
vpnStatusInotifyCallback()
customMonitor
host: parsedUrl.hostname
port: parsedUrl.port ? (if parsedUrl.protocol is 'https:' then 443 else 80)
interval: 10 * 1000
(connected) ->
_connected = connected
if connected
console.log('Internet Connectivity: OK')
blink.pattern.stop()
else
console.log('Waiting for connectivity...')
blink.pattern.start(networkPattern)
secretPromises = {}
generateSecret = (name) ->
Promise.try ->
return config.forceSecret[name] if config.forceSecret[name]?
return randomHexString.generate()
.then (newSecret) ->
secretInDB = { key: "#{name}Secret", value: newSecret }
knex('config').update(secretInDB).where(key: "#{name}Secret")
.then (affectedRows) ->
knex('config').insert(secretInDB) if affectedRows == 0
.return(newSecret)
exports.newSecret = (name) ->
secretPromises[name] ?= Promise.resolve()
secretPromises[name] = secretPromises[name].then ->
generateSecret(name)
exports.getOrGenerateSecret = (name) ->
secretPromises[name] ?= knex('config').select('value').where(key: "#{name}Secret").then ([ secret ]) ->
return secret.value if secret?
generateSecret(name)
return secretPromises[name]
exports.getConfig = getConfig = (key) ->
knex('config').select('value').where({ key })
.then ([ conf ]) ->
return conf?.value
exports.setConfig = (key, value = null) ->
knex('config').update({ value }).where({ key })
.then (n) ->
knex('config').insert({ key, value }) if n == 0
exports.extendEnvVars = (env, uuid, apiKey, appId, appName, commit) ->
host = '127.0.0.1'
newEnv =
RESIN_APP_ID: appId.toString()
RESIN_APP_NAME: appName
RESIN_APP_RELEASE: commit
RESIN_DEVICE_UUID: uuid
RESIN_DEVICE_NAME_AT_INIT: getConfig('name')
RESIN_DEVICE_TYPE: device.getDeviceType()
RESIN_HOST_OS_VERSION: device.getOSVersion()
RESIN_SUPERVISOR_ADDRESS: "http://#{host}:#{config.listenPort}"
RESIN_SUPERVISOR_HOST: host
RESIN_SUPERVISOR_PORT: config.listenPort
RESIN_SUPERVISOR_API_KEY: exports.getOrGenerateSecret('api')
RESIN_SUPERVISOR_VERSION: exports.supervisorVersion
RESIN_API_KEY: apiKey
RESIN: '1'
USER: 'root'
if env?
_.defaults(newEnv, env)
return Promise.props(newEnv)
# Callback function to enable/disable tcp pings
exports.enableConnectivityCheck = (val) ->
enabled = checkTruthy(val) ? true
disableCheck(!enabled)
console.log("Connectivity check enabled: #{enabled}")
return true
# Callback function to enable/disable logs
exports.resinLogControl = (val) ->
logEnabled = checkTruthy(val) ? true
logger.disableLogPublishing(!logEnabled)
console.log('Logs enabled: ' + val)
return true
emptyHostRequest = request.defaults({ headers: Host: '' })
gosuperRequest = (method, endpoint, options = {}, callback) ->
if _.isFunction(options)
callback = options
options = {}
options.method = method
options.url = config.gosuperAddress + endpoint
emptyHostRequest(options, callback)
gosuperPost = _.partial(gosuperRequest, 'POST')
gosuperGet = _.partial(gosuperRequest, 'GET')
exports.gosuper = gosuper =
post: gosuperPost
get: gosuperGet
postAsync: Promise.promisify(gosuperPost, multiArgs: true)
getAsync: Promise.promisify(gosuperGet, multiArgs: true)
# Callback function to enable/disable VPN
exports.vpnControl = (val, logMessage, { initial = false } = {}) ->
enable = checkTruthy(val) ? true
# If it's the initial run, we always want the VPN enabled, so we ignore calls to disable it
if initial and !enable
return Promise.resolve(false)
gosuper.postAsync('/v1/vpncontrol', { json: true, body: Enable: enable })
.spread (response, body) ->
if response.statusCode == 202
console.log('VPN enabled: ' + enable)
return true
else
logMessage("Error (#{response.statusCode}) toggling VPN: #{body}", {}, 'Toggle VPN error')
return false
.catchReturn(false)
exports.restartSystemdService = (serviceName) ->
gosuper.postAsync('/v1/restart-service', { json: true, body: Name: serviceName })
.spread (response, body) ->
if response.statusCode != 200
err = new Error("Error restarting service #{serviceName}: #{response.statusCode} #{body}")
err.statusCode = response.statusCode
throw err
exports.AppNotFoundError = class AppNotFoundError extends TypedError
exports.getKnexApp = (appId, columns) ->
knex('app').select(columns).where({ appId })
.then ([ app ]) ->
if !app?
throw new AppNotFoundError('App not found')
return app
exports.getKnexApps = (columns) ->
knex('app').select(columns)
exports.defaultVolumes = (includeV1Volumes) ->
volumes = {
'/data': {}
'/lib/modules': {}
'/lib/firmware': {}
'/host/run/dbus': {}
}
if includeV1Volumes
volumes['/host/var/lib/connman'] = {}
volumes['/host_run/dbus'] = {}
return volumes
exports.getDataPath = (identifier) ->
return config.dataPath + '/' + identifier
exports.defaultBinds = (dataPath, includeV1Binds) ->
binds = [
exports.getDataPath(dataPath) + ':/data'
"/tmp/resin-supervisor/#{dataPath}:/tmp/resin"
'/lib/modules:/lib/modules'
'/lib/firmware:/lib/firmware'
'/run/dbus:/host/run/dbus'
]
if includeV1Binds
binds.push('/run/dbus:/host_run/dbus')
binds.push('/var/lib/connman:/host/var/lib/connman')
return binds
exports.validComposeOptions = [
'command'
'entrypoint'
'environment'
'expose'
'image'
'labels'
'links'
'net'
'network_mode'
'ports'
'privileged'
'restart'
'stop_signal'
'user'
'volumes' # Will be overwritten with the default binds
'working_dir'
]
exports.validContainerOptions = [
'Hostname'
'User'
'Env'
'Labels'
'Cmd'
'Entrypoint'
'Image'
'Volumes'
'WorkingDir'
'ExposedPorts'
'HostConfig'
'Name'
]
exports.validHostConfigOptions = [
'Binds' # Will be overwritten with the default binds
'Links'
'PortBindings'
'Privileged'
'RestartPolicy'
'NetworkMode'
]
exports.validateKeys = (options, validSet) ->
Promise.try ->
return if !options?
invalidKeys = _.keys(_.omit(options, validSet))
throw new Error("Using #{invalidKeys.join(', ')} is not allowed.") if !_.isEmpty(invalidKeys)
checkAndAddIptablesRule = (rule) ->
execAsync("iptables -C #{rule}")
.catch ->
execAsync("iptables -A #{rule}")
exports.createIpTablesRules = ->
allowedInterfaces = ['resin-vpn', 'tun0', 'docker0', 'lo']
Promise.each allowedInterfaces, (iface) ->
checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -i #{iface} -j ACCEPT")
.then ->
checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -j REJECT")
.catch ->
# On systems without REJECT support, fall back to DROP
checkAndAddIptablesRule("INPUT -p tcp --dport #{config.listenPort} -j DROP")