Proxyvisor: implement the Proxyvisor for the multicontainer supervisor

This will be quickly replaced by a newer version with a different API, but for now we needed to maintain backwards compatibility (see #508).

This proxyvisor handles dependent apps and devices with a multicontainer parent app.
It also switches to the new update mechanism by inferring and applying updates step by step.

Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
Pablo Carranza Velez 2017-11-01 01:10:56 -07:00
parent 195697a7e1
commit bc191ee86c

View File

@ -1,28 +1,17 @@
Promise = require 'bluebird' Promise = require 'bluebird'
dockerUtils = require './docker-utils' _ = require 'lodash'
{ docker } = dockerUtils
express = require 'express' express = require 'express'
fs = Promise.promisifyAll require 'fs' fs = Promise.promisifyAll require 'fs'
{ resinApi, request } = require './request' { request } = require './lib/request'
knex = require './db' constants = require './lib/constants'
_ = require 'lodash' path = require 'path'
deviceRegister = require 'resin-register-device' mkdirp = Promise.promisify(require('mkdirp'))
randomHexString = require './lib/random-hex-string'
utils = require './utils'
device = require './device'
bodyParser = require 'body-parser' bodyParser = require 'body-parser'
appConfig = require './config'
PUBNUB = require 'pubnub'
execAsync = Promise.promisify(require('child_process').exec) execAsync = Promise.promisify(require('child_process').exec)
url = require 'url' url = require 'url'
pubnub = PUBNUB.init(appConfig.pubnub)
isDefined = _.negate(_.isUndefined) isDefined = _.negate(_.isUndefined)
exports.router = router = express.Router()
router.use(bodyParser())
parseDeviceFields = (device) -> parseDeviceFields = (device) ->
device.id = parseInt(device.deviceId) device.id = parseInt(device.deviceId)
device.appId = parseInt(device.appId) device.appId = parseInt(device.appId)
@ -32,71 +21,110 @@ parseDeviceFields = (device) ->
device.targetEnvironment = JSON.parse(device.targetEnvironment ? '{}') device.targetEnvironment = JSON.parse(device.targetEnvironment ? '{}')
return _.omit(device, 'markedForDeletion', 'logs_channel') return _.omit(device, 'markedForDeletion', 'logs_channel')
# TODO move to lib/validation
validStringOrUndefined = (s) ->
_.isUndefined(s) or !_.isEmpty(s)
validObjectOrUndefined = (o) ->
_.isUndefined(o) or _.isObject(o)
router.get '/v1/devices', (req, res) -> tarDirectory = (appId) ->
knex('dependentDevice').select() return "/data/dependent-assets/#{appId}"
tarFilename = (appId, commit) ->
return "#{appId}-#{commit}.tar"
tarPath = (appId, commit) ->
return "#{tarDirectory(appId)}/#{tarFilename(appId, commit)}"
getTarArchive = (source, destination) ->
fs.lstatAsync(destination)
.catch ->
mkdirp(path.dirname(destination))
.then ->
execAsync("tar -cvf '#{destination}' *", cwd: source)
cleanupTars = (appId, commit) ->
if commit?
fileToKeep = tarFilename(appId, commit)
else
fileToKeep = null
dir = tarDirectory(appId)
fs.readdirAsync(dir)
.catchReturn([])
.then (files) ->
if fileToKeep?
files = _.filter files, (file) ->
return file isnt fileToKeep
Promise.map files, (file) ->
if !fileToKeep? or (file isnt fileToKeep)
fs.unlinkAsync(path.join(dir, file))
formatTargetAsState = (device) ->
return {
appId: parseInt(device.appId)
commit: device.targetCommit
environment: device.targetEnvironment
config: device.targetConfig
}
formatCurrentAsState = (device) ->
return {
appId: parseInt(device.appId)
commit: device.commit
environment: device.environment
config: device.config
}
class ProxyvisorRouter
constructor: (@proxyvisor) ->
{ @config, @logger, @db, @docker } = @proxyvisor
@router = express.Router()
@router.use(bodyParser.urlencoded(extended: true))
@router.use(bodyParser.json())
@router.get '/v1/devices', (req, res) =>
@db.models('dependentDevice').select()
.map(parseDeviceFields) .map(parseDeviceFields)
.then (devices) -> .then (devices) ->
res.json(devices) res.json(devices)
.catch (err) -> .catch (err) ->
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
router.post '/v1/devices', (req, res) -> @router.post '/v1/devices', (req, res) =>
{ appId, device_type } = req.body { appId, device_type } = req.body
if !appId? or _.isNaN(parseInt(appId)) or parseInt(appId) <= 0 if !appId? or _.isNaN(parseInt(appId)) or parseInt(appId) <= 0
res.status(400).send('appId must be a positive integer') res.status(400).send('appId must be a positive integer')
return return
device_type = 'generic-amd64' if !device_type? device_type = 'generic-amd64' if !device_type?
Promise.join(
utils.getConfig('apiKey')
utils.getConfig('userId')
device.getID()
randomHexString.generate()
(apiKey, userId, deviceId, logsChannel) ->
uuid = deviceRegister.generateUniqueKey()
d = d =
user: userId
application: req.body.appId application: req.body.appId
uuid: uuid
device_type: device_type device_type: device_type
device: deviceId @proxyvisor.apiBinder.provisionDependentDevice(d)
registered_at: Math.floor(Date.now() / 1000) .then (dev) =>
logs_channel: logsChannel
status: 'Provisioned'
resinApi.post
resource: 'device'
body: d
customOptions:
apikey: apiKey
.timeout(appConfig.apiTimeout)
.then (dev) ->
# If the response has id: null then something was wrong in the request # If the response has id: null then something was wrong in the request
# but we don't know precisely what. # but we don't know precisely what.
if !dev.id? if !dev.id?
res.status(400).send('Provisioning failed, invalid appId or credentials') res.status(400).send('Provisioning failed, invalid appId or credentials')
return return
deviceForDB = { deviceForDB = {
uuid: uuid uuid: dev.uuid
appId: appId appId
device_type: d.device_type device_type: dev.device_type
deviceId: dev.id deviceId: dev.id
name: dev.name name: dev.name
status: d.status status: dev.status
logs_channel: d.logs_channel logs_channel: dev.logs_channel
} }
knex('dependentDevice').insert(deviceForDB) @db.models('dependentDevice').insert(deviceForDB)
.then -> .then ->
res.status(201).send(dev) res.status(201).send(dev)
)
.catch (err) -> .catch (err) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
router.get '/v1/devices/:uuid', (req, res) -> @router.get '/v1/devices/:uuid', (req, res) =>
uuid = req.params.uuid uuid = req.params.uuid
knex('dependentDevice').select().where({ uuid }) @db.models('dependentDevice').select().where({ uuid })
.then ([ device ]) -> .then ([ device ]) ->
return res.status(404).send('Device not found') if !device? return res.status(404).send('Device not found') if !device?
return res.status(410).send('Device deleted') if device.markedForDeletion return res.status(410).send('Device deleted') if device.markedForDeletion
@ -105,7 +133,7 @@ router.get '/v1/devices/:uuid', (req, res) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
router.post '/v1/devices/:uuid/logs', (req, res) -> @router.post '/v1/devices/:uuid/logs', (req, res) =>
uuid = req.params.uuid uuid = req.params.uuid
m = { m = {
message: req.body.message message: req.body.message
@ -113,86 +141,74 @@ router.post '/v1/devices/:uuid/logs', (req, res) ->
} }
m.isSystem = req.body.isSystem if req.body.isSystem? m.isSystem = req.body.isSystem if req.body.isSystem?
knex('dependentDevice').select().where({ uuid }) @db.models('dependentDevice').select().where({ uuid })
.then ([ device ]) -> .then ([ device ]) =>
return res.status(404).send('Device not found') if !device? return res.status(404).send('Device not found') if !device?
return res.status(410).send('Device deleted') if device.markedForDeletion return res.status(410).send('Device deleted') if device.markedForDeletion
pubnub.publish({ channel: "device-#{device.logs_channel}-logs", message: m }) @logger.log(m, { channel: "device-#{device.logs_channel}-logs" })
res.status(202).send('OK') res.status(202).send('OK')
.catch (err) -> .catch (err) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
validStringOrUndefined = (s) -> @router.put '/v1/devices/:uuid', (req, res) =>
_.isUndefined(s) or !_.isEmpty(s)
validObjectOrUndefined = (o) ->
_.isUndefined(o) or _.isObject(o)
router.put '/v1/devices/:uuid', (req, res) ->
uuid = req.params.uuid uuid = req.params.uuid
{ status, is_online, commit, environment, config } = req.body { status, is_online, commit, releaseId, environment, config } = req.body
validateDeviceFields = ->
if isDefined(is_online) and !_.isBoolean(is_online) if isDefined(is_online) and !_.isBoolean(is_online)
res.status(400).send('is_online must be a boolean') return 'is_online must be a boolean'
return
if !validStringOrUndefined(status) if !validStringOrUndefined(status)
res.status(400).send('status must be a non-empty string') return 'status must be a non-empty string'
return
if !validStringOrUndefined(commit) if !validStringOrUndefined(commit)
res.status(400).send('commit must be a non-empty string') return 'commit must be a non-empty string'
return if !validStringOrUndefined(releaseId)
return 'commit must be a non-empty string'
if !validObjectOrUndefined(environment) if !validObjectOrUndefined(environment)
res.status(400).send('environment must be an object') return 'environment must be an object'
return
if !validObjectOrUndefined(config) if !validObjectOrUndefined(config)
res.status(400).send('config must be an object') return 'config must be an object'
return null
requestError = validateDeviceFields()
if requestError?
res.status(400).send(requestError)
return return
environment = JSON.stringify(environment) if isDefined(environment) environment = JSON.stringify(environment) if isDefined(environment)
config = JSON.stringify(config) if isDefined(config) config = JSON.stringify(config) if isDefined(config)
fieldsToUpdateOnDB = _.pickBy({ status, is_online, commit, config, environment }, isDefined) fieldsToUpdateOnDB = _.pickBy({ status, is_online, commit, releaseId, config, environment }, isDefined)
fieldsToUpdateOnAPI = _.pick(fieldsToUpdateOnDB, 'status', 'is_online', 'commit') fieldsToUpdateOnAPI = _.pick(fieldsToUpdateOnDB, 'status', 'is_online', 'commit', 'releaseId')
if _.isEmpty(fieldsToUpdateOnDB) if _.isEmpty(fieldsToUpdateOnDB)
res.status(400).send('At least one device attribute must be updated') res.status(400).send('At least one device attribute must be updated')
return return
Promise.join( @db.models('dependentDevice').select().where({ uuid })
utils.getConfig('apiKey') .then ([ device ]) =>
knex('dependentDevice').select().where({ uuid })
(apiKey, [ device ]) ->
throw new Error('apikey not found') if !apiKey?
return res.status(404).send('Device not found') if !device? return res.status(404).send('Device not found') if !device?
return res.status(410).send('Device deleted') if device.markedForDeletion return res.status(410).send('Device deleted') if device.markedForDeletion
throw new Error('Device is invalid') if !device.deviceId? throw new Error('Device is invalid') if !device.deviceId?
Promise.try -> Promise.try =>
if !_.isEmpty(fieldsToUpdateOnAPI) if !_.isEmpty(fieldsToUpdateOnAPI)
resinApi.patch @proxyvisor.apiBinder.patchDevice(device.deviceId, fieldsToUpdateOnAPI)
resource: 'device' .then =>
id: device.deviceId @db.models('dependentDevice').update(fieldsToUpdateOnDB).where({ uuid })
body: fieldsToUpdateOnAPI .then =>
customOptions: @db.models('dependentDevice').select().where({ uuid })
apikey: apiKey .then ([ device ]) ->
.timeout(appConfig.apiTimeout)
.then ->
knex('dependentDevice').update(fieldsToUpdateOnDB).where({ uuid })
.then ->
res.json(parseDeviceFields(device)) res.json(parseDeviceFields(device))
)
.catch (err) -> .catch (err) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
tarPath = ({ appId, commit }) -> @router.get '/v1/dependent-apps/:appId/assets/:commit', (req, res) =>
return '/tmp/' + appId + '-' + commit + '.tar' @db.models('dependentApp').select().where(_.pick(req.params, 'appId', 'commit'))
.then ([ app ]) =>
router.get '/v1/dependent-apps/:appId/assets/:commit', (req, res) ->
knex('dependentApp').select().where(_.pick(req.params, 'appId', 'commit'))
.then ([ app ]) ->
return res.status(404).send('Not found') if !app return res.status(404).send('Not found') if !app
dest = tarPath(app) dest = tarPath(app.appId, app.commit)
fs.lstatAsync(dest) fs.lstatAsync(dest)
.catch -> .catch =>
Promise.using docker.imageRootDirMounted(app.imageId), (rootDir) -> Promise.using @docker.imageRootDirMounted(app.image), (rootDir) ->
getTarArchive(rootDir + '/assets', dest) getTarArchive(rootDir + '/assets', dest)
.then -> .then ->
res.sendFile(dest) res.sendFile(dest)
@ -200,8 +216,8 @@ router.get '/v1/dependent-apps/:appId/assets/:commit', (req, res) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
router.get '/v1/dependent-apps', (req, res) -> @router.get '/v1/dependent-apps', (req, res) =>
knex('dependentApp').select() @db.models('dependentApp').select()
.map (app) -> .map (app) ->
return { return {
id: parseInt(app.appId) id: parseInt(app.appId)
@ -215,81 +231,41 @@ router.get '/v1/dependent-apps', (req, res) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack) console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
res.status(503).send(err?.message or err or 'Unknown error') res.status(503).send(err?.message or err or 'Unknown error')
getTarArchive = (path, destination) -> module.exports = class Proxyvisor
fs.lstatAsync(path) constructor: ({ @config, @logger, @db, @docker, @images, @applications }) ->
.then -> @acknowledgedState = {}
execAsync("tar -cvf '#{destination}' *", cwd: path) @lastRequestForDevice = {}
@_router = new ProxyvisorRouter(this)
@router = @_router.router
@validActions = [ 'updateDependentTargets', 'sendDependentHooks', 'removeDependentApp' ]
# TODO: deduplicate code from compareForUpdate in application.coffee bindToAPI: (apiBinder) =>
exports.fetchAndSetTargetsForDependentApps = (state, fetchFn, apiKey) -> @apiBinder = apiBinder
knex('dependentApp').select()
.then (localDependentApps) ->
# Compare to see which to fetch, and which to delete
remoteApps = _.mapValues state.apps, (app, appId) ->
conf = app.config ? {}
return {
appId: appId
parentAppId: app.parentApp
imageId: app.image
commit: app.commit
config: JSON.stringify(conf)
name: app.name
}
localApps = _.keyBy(localDependentApps, 'appId')
toBeDownloaded = _.filter remoteApps, (app, appId) -> executeStepAction: (step) =>
return app.commit? and app.imageId? and !_.some(localApps, imageId: app.imageId) Promise.try =>
toBeRemoved = _.filter localApps, (app, appId) -> actions = {
return app.commit? and !_.some(remoteApps, imageId: app.imageId) updateDependentTargets: =>
toBeDeletedFromDB = _(localApps).reject((app, appId) -> remoteApps[appId]?).map('appId').value() @config.getMany([ 'currentApiKey', 'apiTimeout' ])
Promise.map toBeDownloaded, (app) -> .then ({ currentApiKey, apiTimeout }) =>
deltaSource = null # - take each of the step.devices and update dependentDevice with it (targetCommit, targetEnvironment, targetConfig)
if localApps[app.appId]? # - if update returns 0, then use APIBinder to fetch the device, then store it to the db
deltaSource = localApps[app.appId].imageId # - set markedForDeletion: true for devices that are not in the step.devices list
else if !_.isEmpty(localDependentApps) # - update dependentApp with step.app
deltaSource = localDependentApps[0].imageId Promise.map step.devices, (device) =>
fetchFn(app, { deltaSource, setDeviceUpdateState: false }) uuid = device.uuid
.then ->
Promise.map toBeRemoved, (app) ->
fs.unlinkAsync(tarPath(app))
.then ->
docker.getImage(app.imageId).remove()
.catch (err) ->
console.error('Could not remove image/artifacts for dependent app', err, err.stack)
.then ->
Promise.props(
_.mapValues remoteApps, (app, appId) ->
knex('dependentApp').update(app).where({ appId })
.then (n) ->
knex('dependentApp').insert(app) if n == 0
)
.then ->
knex('dependentDevice').del().whereIn('appId', toBeDeletedFromDB)
.then ->
knex('dependentApp').del().whereIn('appId', toBeDeletedFromDB)
.then ->
knex('dependentDevice').update({ markedForDeletion: true }).whereNotIn('uuid', _.keys(state.devices))
.then ->
Promise.all _.map state.devices, (device, uuid) ->
# Only consider one app per dependent device for now # Only consider one app per dependent device for now
appId = _(device.apps).keys().head() appId = _(device.apps).keys().head()
targetCommit = state.apps[appId].commit targetCommit = device.apps[appId].commit
targetEnvironment = JSON.stringify(device.apps[appId].environment ? {}) targetEnvironment = JSON.stringify(device.apps[appId].environment)
targetConfig = JSON.stringify(device.apps[appId].config ? {}) targetConfig = JSON.stringify(device.apps[appId].config)
knex('dependentDevice').update({ targetEnvironment, targetConfig, targetCommit, name: device.name }).where({ uuid }) @db.models('dependentDevice').update({ appId, targetEnvironment, targetConfig, targetCommit, name: device.name }).where({ uuid })
.then (n) -> .then (n) =>
return if n != 0 return if n != 0
# If the device is not in the DB it means it was provisioned externally # If the device is not in the DB it means it was provisioned externally
# so we need to fetch it. # so we need to fetch it.
resinApi.get @apiBinder.fetchDevice(uuid, currentApiKey, apiTimeout)
resource: 'device' .then (dev) =>
options:
filter:
uuid: uuid
customOptions:
apikey: apiKey
.timeout(appConfig.apiTimeout)
.then ([ dev ]) ->
deviceForDB = { deviceForDB = {
uuid: uuid uuid: uuid
appId: appId appId: appId
@ -303,74 +279,354 @@ exports.fetchAndSetTargetsForDependentApps = (state, fetchFn, apiKey) ->
targetConfig targetConfig
targetEnvironment targetEnvironment
} }
knex('dependentDevice').insert(deviceForDB) @db.models('dependentDevice').insert(deviceForDB)
.catch (err) -> .then =>
console.error('Error fetching dependent apps', err, err.stack) @db.models('dependentDevice').where({ appId: step.appId }).whereNotIn('uuid', _.map(step.devices, 'uuid')).update({ markedForDeletion: true })
.then =>
@normaliseDependentAppForDB(step.app)
.then (appForDB) =>
@db.upsertModel('dependentApp', appForDB, { appId: step.appId })
.then ->
cleanupTars(step.appId, step.app.commit)
getHookEndpoint = (appId) -> sendDependentHooks: =>
knex('dependentApp').select('parentAppId').where({ appId }) Promise.join(
.then ([ { parentAppId } ]) -> @config.get('apiTimeout')
utils.getKnexApp(parentAppId) @getHookEndpoint(step.appId)
.then (parentApp) -> (apiTimeout, endpoint) =>
conf = JSON.parse(parentApp.config) Promise.mapSeries step.devices, (device) =>
dockerUtils.getImageEnv(parentApp.imageId) Promise.try =>
.then (imageEnv) -> if @lastRequestForDevice[device.uuid]?
return imageEnv.RESIN_DEPENDENT_DEVICES_HOOK_ADDRESS ? diff = Date.now() - @lastRequestForDevice[device.uuid]
conf.RESIN_DEPENDENT_DEVICES_HOOK_ADDRESS ? if diff < 30000
"#{appConfig.proxyvisorHookReceiver}/v1/devices/" Promise.delay(30001 - diff)
.then =>
@lastRequestForDevice[device.uuid] = Date.now()
if device.markedForDeletion
@sendDeleteHook(device, apiTimeout, endpoint)
else
@sendUpdate(device, apiTimeout, endpoint)
)
formatTargetAsState = (device) -> removeDependentApp: =>
# find step.app and delete it from the DB
# find devices with step.appId and delete them from the DB
@db.transaction (trx) ->
trx('dependentApp').where({ appId: step.appId }).del()
.then ->
trx('dependentDevice').where({ appId: step.appId }).del()
.then ->
cleanupTars(step.appId)
}
throw new Error("Invalid proxyvisor action #{step.action}") if !actions[step.action]?
actions[step.action]()
getCurrentStates: =>
Promise.join(
@db.models('dependentApp').select().map(@normaliseDependentAppFromDB)
@db.models('dependentDevice').select()
(apps, devicesFromDB) ->
devices = _.map devicesFromDB, (device) ->
dev = {
uuid: device.uuid
name: device.name
lock_expiry_date: device.lock_expiry_date
markedForDeletion: device.markedForDeletion
apps: {}
}
dev.apps[device.appId] = {
commit: device.commit
config: JSON.parse(device.config)
environment: JSON.parse(device.environment)
targetCommit: device.targetCommit
targetEnvironment: JSON.parse(device.targetEnvironment)
targetConfig: JSON.parse(device.targetConfig)
}
return dev
return { apps, devices }
)
normaliseDependentAppForDB: (app) =>
if app.image?
image = @images.normalise(app.image)
else
image = null
dbApp = {
appId: app.appId
name: app.name
commit: app.commit
releaseId: app.releaseId
parentApp: app.parentApp
image: image
config: JSON.stringify(app.config ? {})
environment: JSON.stringify(app.environment ? {})
}
return Promise.props(dbApp)
normaliseDependentDeviceTargetForDB: (device, appCommit) ->
Promise.try ->
apps = _.clone(device.apps ? {})
_.forEach apps, (app) ->
app.commit ?= appCommit
app.config ?= {}
app.environment ?= {}
apps = JSON.stringify(apps)
outDevice = {
uuid: device.uuid
name: device.name
apps
}
return outDevice
setTargetInTransaction: (dependent, trx) =>
Promise.try =>
if dependent?.apps?
appsArray = _.map dependent.apps, (app, appId) ->
appClone = _.clone(app)
appClone.appId = appId
return appClone
Promise.map(appsArray, @normaliseDependentAppForDB)
.then (appsForDB) =>
Promise.map appsForDB, (app) =>
@db.upsertModel('dependentAppTarget', app, { appId: app.appId }, trx)
.then ->
trx('dependentAppTarget').whereNotIn('appId', _.map(appsForDB, 'appId')).del()
.then =>
if dependent?.devices?
devicesArray = _.map dependent.devices, (dev, uuid) ->
devClone = _.clone(dev)
devClone.uuid = uuid
return devClone
Promise.map devicesArray, (device) =>
appId = _.keys(device.apps)[0]
@normaliseDependentDeviceTargetForDB(device, dependent.apps[appId]?.commit)
.then (devicesForDB) =>
Promise.map devicesForDB, (device) =>
@db.upsertModel('dependentDeviceTarget', device, { uuid: device.uuid }, trx)
.then ->
trx('dependentDeviceTarget').whereNotIn('uuid', _.map(devicesForDB, 'uuid')).del()
normaliseDependentAppFromDB: (app) ->
Promise.try ->
outApp = {
appId: app.appId
name: app.name
commit: app.commit
releaseId: app.releaseId
image: app.image
config: JSON.parse(app.config)
environment: JSON.parse(app.environment)
parentApp: app.parentApp
}
return outApp
normaliseDependentDeviceTargetFromDB: (device) ->
Promise.try ->
outDevice = {
uuid: device.uuid
name: device.name
apps: JSON.parse(device.apps)
}
return outDevice
normaliseDependentDeviceFromDB: (device) ->
Promise.try ->
outDevice = _.clone(device)
_.forEach [ 'environment', 'config', 'targetEnvironment', 'targetConfig' ], (prop) ->
outDevice[prop] = JSON.parse(device[prop])
return outDevice
getTarget: =>
Promise.props({
apps: @db.models('dependentAppTarget').select().map(@normaliseDependentAppFromDB)
devices: @db.models('dependentDeviceTarget').select().map(@normaliseDependentDeviceTargetFromDB)
})
imagesInUse: (current, target) ->
images = []
if current.dependent?.apps?
_.forEach current.dependent.apps, (app) ->
images.push app.image
if target.dependent?.apps?
_.forEach target.dependent.apps, (app) ->
images.push app.image
return images
_imageAvailable: (image, available) ->
_.some(available, (availableImage) -> availableImage.name == image)
_getHookStep: (currentDevices, appId) =>
hookStep = {
action: 'sendDependentHooks'
devices: []
appId
}
_.forEach currentDevices, (device) =>
if device.markedForDeletion
hookStep.devices.push({
uuid: device.uuid
markedForDeletion: true
})
else
targetState = {
appId
commit: device.apps[appId].targetCommit
config: device.apps[appId].targetConfig
environment: device.apps[appId].targetEnvironment
}
currentState = {
appId
commit: device.apps[appId].commit
config: device.apps[appId].config
environment: device.apps[appId].environment
}
if device.apps[appId].targetCommit? and !_.isEqual(targetState, currentState) and !_.isEqual(targetState, @acknowledgedState[device.uuid])
hookStep.devices.push({
uuid: device.uuid
target: targetState
})
return hookStep
_compareDevices: (currentDevices, targetDevices, appId) ->
currentDeviceTargets = _.map currentDevices, (dev) ->
return null if dev.markedForDeletion
devTarget = _.clone(dev)
delete devTarget.markedForDeletion
devTarget.apps = {}
devTarget.apps[appId] = {
commit: dev.apps[appId].targetCommit
environment: dev.apps[appId].targetEnvironment
config: dev.apps[appId].targetConfig
}
return devTarget
currentDeviceTargets = _.filter(currentDeviceTargets, (dev) -> !_.isNull(dev))
return !_.isEmpty(_.xorWith(currentDeviceTargets, targetDevices, _.isEqual))
imageForDependentApp: (app) ->
return { return {
commit: device.targetCommit name: app.image
environment: device.targetEnvironment imageId: app.imageId
config: device.targetConfig appId: app.appId
dependent: true
} }
do -> getRequiredSteps: (availableImages, current, target, stepsInProgress) =>
acknowledgedState = {} steps = []
sendUpdate = (device, endpoint) -> Promise.try =>
stateToSend = { targetApps = _.keyBy(target.dependent?.apps ? [], 'appId')
appId: parseInt(device.appId) targetAppIds = _.keys(targetApps)
commit: device.targetCommit currentApps = _.keyBy(current.dependent?.apps ? [], 'appId')
environment: JSON.parse(device.targetEnvironment) currentAppIds = _.keys(currentApps)
config: JSON.parse(device.targetConfig) allAppIds = _.union(targetAppIds, currentAppIds)
}
toBeDownloaded = _.filter targetAppIds, (appId) =>
return targetApps[appId].commit? and targetApps[appId].image? and !@_imageAvailable(targetApps[appId].image, availableImages)
appIdsToCheck = _.filter allAppIds, (appId) ->
# - if a step is in progress for this appId, ignore
!_.some(steps.concat(stepsInProgress), (step) -> step.appId == appId)
_.forEach appIdsToCheck, (appId) =>
# - if there's current but not target, push a removeDependentApp step
if !targetApps[appId]?
steps.push({
action: 'removeDependentApp'
appId
})
return
# - if toBeDownloaded includes this app, push a fetch step
if _.includes(toBeDownloaded, appId)
steps.push({
action: 'fetch'
appId
image: @imageForDependentApp(targetApps[appId])
})
return
devicesForApp = (devices) ->
_.filter devices, (d) ->
_.includes(_.keys(d.apps), appId)
currentDevices = devicesForApp(current.dependent.devices)
targetDevices = devicesForApp(target.dependent.devices)
devicesDiffer = @_compareDevices(currentDevices, targetDevices, appId)
# - if current doesn't match target, or the devices differ, push an updateDependentTargets step
if !_.isEqual(currentApps[appId], targetApps[appId]) or devicesDiffer
steps.push({
action: 'updateDependentTargets'
devices: targetDevices
app: targetApps[appId]
appId
})
return
# if we got to this point, the current app is up to date and devices have the
# correct targetCommit, targetEnvironment and targetConfig.
hookStep = @_getHookStep(currentDevices, appId)
if !_.isEmpty(hookStep.devices)
steps.push(hookStep)
.then ->
return steps
getHookEndpoint: (appId) =>
@db.models('dependentApp').select('parentApp').where({ appId })
.then ([ { parentApp } ]) =>
@applications.getTargetApp(parentApp)
.then (parentApp) =>
Promise.map parentApp?.services ? [], (service) =>
@docker.getImageEnv(service.image)
.then (imageEnv) ->
return imageEnv.RESIN_DEPENDENT_DEVICES_HOOK_ADDRESS
.then (imageHookAddresses) ->
for addr in imageHookAddresses
return addr if addr?
return parentApp?.config?.RESIN_DEPENDENT_DEVICES_HOOK_ADDRESS ?
"#{constants.proxyvisorHookReceiver}/v1/devices/"
sendUpdate: (device, timeout, endpoint) =>
request.putAsync "#{endpoint}#{device.uuid}", { request.putAsync "#{endpoint}#{device.uuid}", {
json: true json: true
body: stateToSend body: device.target
} }
.timeout(appConfig.apiTimeout) .timeout(timeout)
.spread (response, body) -> .spread (response, body) =>
if response.statusCode == 200 if response.statusCode == 200
acknowledgedState[device.uuid] = formatTargetAsState(device) @acknowledgedState[device.uuid] = device.target
else else
acknowledgedState[device.uuid] = null @acknowledgedState[device.uuid] = null
throw new Error("Hook returned #{response.statusCode}: #{body}") if response.statusCode != 202 throw new Error("Hook returned #{response.statusCode}: #{body}") if response.statusCode != 202
.catch (err) -> .catch (err) ->
return console.error("Error updating device #{device.uuid}", err, err.stack) return console.error("Error updating device #{device.uuid}", err, err.stack)
sendDeleteHook = (device, endpoint) -> sendDeleteHook: ({ uuid }, timeout, endpoint) =>
uuid = device.uuid
request.delAsync("#{endpoint}#{uuid}") request.delAsync("#{endpoint}#{uuid}")
.timeout(appConfig.apiTimeout) .timeout(timeout)
.spread (response, body) -> .spread (response, body) =>
if response.statusCode == 200 if response.statusCode == 200
knex('dependentDevice').del().where({ uuid }) @db.models('dependentDevice').del().where({ uuid })
else else
throw new Error("Hook returned #{response.statusCode}: #{body}") throw new Error("Hook returned #{response.statusCode}: #{body}")
.catch (err) -> .catch (err) ->
return console.error("Error deleting device #{device.uuid}", err, err.stack) return console.error("Error deleting device #{uuid}", err, err.stack)
exports.sendUpdates = -> sendUpdates: ({ uuid }) =>
endpoints = {} Promise.join(
knex('dependentDevice').select() @db.models('dependentDevice').where({ uuid }).select()
.map (device) -> @config.get('apiTimeout')
currentState = _.pick(device, 'commit', 'environment', 'config') ([ dev ], apiTimeout) =>
if !dev?
console.log("Warning, trying to send update to non-existent device #{uuid}")
return
@normaliseDependentDeviceFromDB(dev)
.then (device) =>
currentState = formatCurrentAsState(device)
targetState = formatTargetAsState(device) targetState = formatTargetAsState(device)
endpoints[device.appId] ?= getHookEndpoint(device.appId) @getHookEndpoint(device.appId)
endpoints[device.appId] .then (endpoint) =>
.then (endpoint) ->
if device.markedForDeletion if device.markedForDeletion
sendDeleteHook(device, endpoint) @sendDeleteHook(device, apiTimeout, endpoint)
else if device.targetCommit? and !_.isEqual(targetState, currentState) and !_.isEqual(targetState, acknowledgedState[device.uuid]) else if device.targetCommit? and !_.isEqual(targetState, currentState) and !_.isEqual(targetState, @acknowledgedState[device.uuid])
sendUpdate(device, endpoint) @sendUpdate(device, targetState, apiTimeout, endpoint)
)