mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-02-01 16:57:57 +00:00
compose: implement the models that make up multicontainer applications
This commit adds models to manage services, images, volumes and networks. The main model for this is ServiceManager, which manages the collection of services on the device. It has functions to query what services are running, and to perform actions like starting, killing or performing handovers. The Service model allows defining the transformations between a container and its service representation, and includes the functions to compare a running service with a target to determine if an update needs to happen. This model includes the relevant compose file entries for a service that are supported. Bind mounts are disallowed except for the ones that relate to supervisor features, and persistent data is now stored in named volumes. The Images model allows fetching and removing images, and includes functionality to determine images that have to be cleaned up - now only dangling and old supervisor images are cleaned up automatically, and ApplicationManager will remove images that correspond to old services that are no longer needed. The Networks and Volumes models allow managing named networks and volumes that are part of composed applications. Changelog-Entry: Remove all bind mounts that were specific to 1.X devices. Move the resin-kill-me file for the handover strategy to /tmp/resin. Add environment variables for the location of resin-kill-me and the lockfile. Use running containers to determine what services are running instead of storing them in the internal database. Use named volumes for persistent data. Change-Type: major Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
parent
be5623cbf1
commit
195697a7e1
195
src/compose/images.coffee
Normal file
195
src/compose/images.coffee
Normal file
@ -0,0 +1,195 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
EventEmitter = require 'events'
|
||||
logTypes = require '../lib/log-types'
|
||||
constants = require '../lib/constants'
|
||||
validation = require '../lib/validation'
|
||||
|
||||
ImageNotFoundError = (err) ->
|
||||
return "#{err.statusCode}" is '404'
|
||||
|
||||
# image = {
|
||||
# name: image registry/repo:tag
|
||||
# appId
|
||||
# serviceId
|
||||
# serviceName
|
||||
# imageId (from resin API)
|
||||
# releaseId
|
||||
# dependent
|
||||
# status Downloading, Downloaded, Deleting
|
||||
# download_progress
|
||||
# }
|
||||
|
||||
module.exports = class Images extends EventEmitter
|
||||
constructor: ({ @docker, @logger, @db }) ->
|
||||
@imageCleanupFailures = {}
|
||||
# A store of volatile state for images (e.g. download progress), indexed by imageId
|
||||
@volatileState = {}
|
||||
|
||||
reportChange: (imageId, status) ->
|
||||
if status?
|
||||
@volatileState[imageId] ?= { imageId }
|
||||
_.merge(@volatileState[imageId], status)
|
||||
@emit('change')
|
||||
else if imageId? and @volatileState[imageId]?
|
||||
delete @volatileState[imageId]
|
||||
@emit('change')
|
||||
|
||||
fetch: (image, opts) =>
|
||||
onProgress = (progress) =>
|
||||
@reportChange(image.imageId, { download_progress: progress.percentage })
|
||||
|
||||
@normalise(image.name)
|
||||
.then (imageName) =>
|
||||
image = _.clone(image)
|
||||
image.name = imageName
|
||||
@markAsSupervised(image)
|
||||
.then =>
|
||||
@inspectByName(imageName)
|
||||
.catch =>
|
||||
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', download_progress: 0 }))
|
||||
Promise.try =>
|
||||
if validation.checkTruthy(opts.delta)
|
||||
@logger.logSystemEvent(logTypes.downloadImageDelta, { image })
|
||||
@docker.rsyncImageWithProgress(imageName, opts, onProgress)
|
||||
else
|
||||
@logger.logSystemEvent(logTypes.downloadImage, { image })
|
||||
@docker.fetchImageWithProgress(imageName, opts, onProgress)
|
||||
.then =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageSuccess, { image })
|
||||
@inspectByName(imageName)
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageError, { image, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
@reportChange(image.imageId)
|
||||
|
||||
format: (image) ->
|
||||
image.serviceId ?= null
|
||||
image.serviceName ?= null
|
||||
image.imageId ?= null
|
||||
image.releaseId ?= null
|
||||
image.dependent ?= false
|
||||
return _.omit(image, 'id')
|
||||
|
||||
markAsSupervised: (image) =>
|
||||
image = @format(image)
|
||||
@db.upsertModel('image', image, image)
|
||||
|
||||
update: (image) =>
|
||||
image = @format(image)
|
||||
@db.models('image').update(image).where(name: image.name)
|
||||
|
||||
_removeImageIfNotNeeded: (image) =>
|
||||
removed = true
|
||||
@inspectByName(image.name)
|
||||
.catch ImageNotFoundError, (err) ->
|
||||
removed = false
|
||||
return null
|
||||
.then (img) =>
|
||||
if img?
|
||||
@db.models('image').where(name: image.name).select()
|
||||
.then (imagesFromDB) =>
|
||||
if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(image))
|
||||
@docker.getImage(image.name).remove(force: true)
|
||||
.then ->
|
||||
return removed
|
||||
|
||||
remove: (image) =>
|
||||
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Deleting' }))
|
||||
@logger.logSystemEvent(logTypes.deleteImage, { image })
|
||||
@_removeImageIfNotNeeded(image)
|
||||
.tap =>
|
||||
@db.models('image').del().where(image)
|
||||
.then (removed) =>
|
||||
if removed
|
||||
@logger.logSystemEvent(logTypes.deleteImageSuccess, { image })
|
||||
else
|
||||
@logger.logSystemEvent(logTypes.imageAlreadyDeleted, { image })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.deleteImageError, { image, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
@reportChange(image.imageId)
|
||||
|
||||
getNormalisedTags: (image) ->
|
||||
Promise.map(image.RepoTags ? [], (tag) => @normalise(tag))
|
||||
|
||||
_withImagesFromDockerAndDB: (callback) =>
|
||||
Promise.join(
|
||||
@docker.listImages()
|
||||
.map (image) =>
|
||||
image.NormalisedRepoTags = @getNormalisedTags(image)
|
||||
Promise.props(image)
|
||||
@db.models('image').select()
|
||||
callback
|
||||
)
|
||||
|
||||
_isAvailableInDocker: (image, dockerImages) ->
|
||||
_.some dockerImages, (dockerImage) ->
|
||||
_.includes(dockerImage.NormalisedRepoTags, image.name)
|
||||
|
||||
# Gets all images that are supervised, in an object containing name, appId, serviceId, serviceName, imageId, dependent.
|
||||
getAvailable: =>
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
return _.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages))
|
||||
|
||||
cleanupDatabase: =>
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
return _.filter(supervisedImages, (image) => !@_isAvailableInDocker(image, dockerImages))
|
||||
.then (imagesToRemove) =>
|
||||
ids = _.map(imagesToRemove, 'id')
|
||||
@db.models('image').del().whereIn('id', ids)
|
||||
|
||||
getStatus: =>
|
||||
@getAvailable()
|
||||
.map (image) ->
|
||||
image.status = 'Downloaded'
|
||||
image.download_progress = null
|
||||
return image
|
||||
.then (images) =>
|
||||
status = _.clone(@volatileState)
|
||||
_.forEach images, (image) ->
|
||||
status[image.imageId] ?= image
|
||||
return _.values(status)
|
||||
|
||||
_getDanglingAndOldSupervisorsForCleanup: =>
|
||||
images = []
|
||||
@docker.getRegistryAndName(constants.supervisorImage)
|
||||
.then (supervisorImageInfo) =>
|
||||
@docker.listImages()
|
||||
.map (image) =>
|
||||
Promise.map image.RepoTags ? [], (repoTag) =>
|
||||
@docker.getRegistryAndName(repoTag)
|
||||
.then ({ imageName, tagName }) ->
|
||||
if imageName == supervisorImageInfo.imageName and tagName != supervisorImageInfo.tagName
|
||||
images.push(repoTag)
|
||||
.then =>
|
||||
@docker.listImages(filters: { dangling: [ 'true' ] })
|
||||
.map (image) ->
|
||||
images.push(image.Id)
|
||||
.then =>
|
||||
return _.filter images, (image) =>
|
||||
!@imageCleanupFailures[image]? or Date.now() - @imageCleanupFailures[image] > 3600 * 1000
|
||||
|
||||
inspectByName: (imageName) =>
|
||||
@docker.getImage(imageName).inspect()
|
||||
|
||||
normalise: (imageName) =>
|
||||
@docker.normaliseImageName(imageName)
|
||||
|
||||
isCleanupNeeded: =>
|
||||
@_getDanglingAndOldSupervisorsForCleanup()
|
||||
.then (imagesForCleanup) ->
|
||||
return !_.isEmpty(imagesForCleanup)
|
||||
|
||||
# Delete old supervisor images and dangling images
|
||||
cleanup: =>
|
||||
@_getDanglingAndOldSupervisorsForCleanup()
|
||||
.map (image) =>
|
||||
@docker.getImage(image).remove(force: true)
|
||||
.then =>
|
||||
delete @imageCleanupFailures[image]
|
||||
.catch (err) =>
|
||||
@logger.logSystemMessage("Error cleaning up #{image}: #{err.message} - will ignore for 1 hour", { error: err }, 'Image cleanup error')
|
||||
@imageCleanupFailures[image] = Date.now()
|
56
src/compose/networks.coffee
Normal file
56
src/compose/networks.coffee
Normal file
@ -0,0 +1,56 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
|
||||
logTypes = require '../lib/log-types'
|
||||
|
||||
module.exports = class Networks
|
||||
constructor: ({ @docker, @logger }) ->
|
||||
|
||||
format: (network) ->
|
||||
return {
|
||||
appId: network.Labels['io.resin.appId']
|
||||
name: network.Name
|
||||
config: {}
|
||||
}
|
||||
|
||||
getAll: =>
|
||||
@docker.listNetworks(filters: label: [ 'io.resin.supervised' ])
|
||||
.then (networks) =>
|
||||
Promise.map networks, (network) =>
|
||||
@docker.getNetwork(network.Name).inspect()
|
||||
.then (net) =>
|
||||
@format(net)
|
||||
|
||||
getAllByAppId: (appId) =>
|
||||
@getAll()
|
||||
.then (networks) ->
|
||||
_.filter(networks, (v) -> v.appId == appId)
|
||||
|
||||
get: (name) ->
|
||||
@docker.getNetwork(name).inspect()
|
||||
.then (network) ->
|
||||
return @format(network)
|
||||
|
||||
# TODO: what config values are relevant/whitelisted?
|
||||
create: ({ name, config, appId }) =>
|
||||
@logger.logSystemEvent(logTypes.createNetwork, { network: { name } })
|
||||
@docker.createNetwork({
|
||||
Name: name
|
||||
Labels: {
|
||||
'io.resin.supervised': 'true'
|
||||
'io.resin.appId': appId
|
||||
}
|
||||
})
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.createNetworkError, { network: { name }, error: err })
|
||||
throw err
|
||||
|
||||
remove: ({ name }) ->
|
||||
@logger.logSystemEvent(logTypes.removeNetwork, { network: { name } })
|
||||
@docker.getNetwork(name).remove()
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.removeNetworkError, { network: { name }, error: err })
|
||||
throw err
|
||||
|
||||
isEqualConfig: (current, target) ->
|
||||
return true
|
274
src/compose/service-manager.coffee
Normal file
274
src/compose/service-manager.coffee
Normal file
@ -0,0 +1,274 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
EventEmitter = require 'events'
|
||||
JSONParser = require 'jsonparse'
|
||||
fs = Promise.promisifyAll(require('fs'))
|
||||
|
||||
logTypes = require '../lib/log-types'
|
||||
{ checkInt } = require '../lib/validation'
|
||||
constants = require '../lib/constants'
|
||||
|
||||
Service = require './service'
|
||||
#restartVars = (conf) ->
|
||||
# return _.pick(conf, [ 'RESIN_DEVICE_RESTART', 'RESIN_RESTART' ])
|
||||
|
||||
module.exports = class ServiceManager extends EventEmitter
|
||||
constructor: ({ @docker, @logger, @config }) ->
|
||||
@containerHasDied = {}
|
||||
@listening = false
|
||||
# Volatile state of containers, indexed by containerId (or random strings if we don't have a containerId yet)
|
||||
@volatileState = {}
|
||||
|
||||
killAllLegacy: =>
|
||||
# Containers haven't been normalized (this is an updated supervisor)
|
||||
# so we need to stop and remove them
|
||||
@docker.getImage(constants.supervisorImage).inspect()
|
||||
.then (supervisorImage) =>
|
||||
Promise.map(@docker.listContainers(all: true), (container) =>
|
||||
@docker.getImage(container.Image).inspect()
|
||||
.then (containerImage) =>
|
||||
if containerImage.Id != supervisorImage.Id
|
||||
@_killContainer(container.Id, { serviceName: 'legacy', image: container.Image }, { removeContainer: true })
|
||||
)
|
||||
|
||||
reportChange: (containerId, status) ->
|
||||
if status?
|
||||
@volatileState[containerId] ?= {}
|
||||
_.merge(@volatileState[containerId], status)
|
||||
@emit('change')
|
||||
else if containerId? and @volatileState[containerId]?
|
||||
delete @volatileState[containerId]
|
||||
@emit('change')
|
||||
|
||||
reportNewStatus: (containerId, service, status) =>
|
||||
@reportChange(containerId, _.merge({ status }, _.pick(service, [ 'imageId', 'appId', 'releaseId', 'commit' ])))
|
||||
|
||||
_killContainer: (containerId, service = {}, { removeContainer = true }) =>
|
||||
@logger.logSystemEvent(logTypes.stopService, { service })
|
||||
@reportNewStatus(containerId, service, 'Stopping') if service.imageId?
|
||||
containerObj = @docker.getContainer(containerId)
|
||||
containerObj.stop(t: 10)
|
||||
.then ->
|
||||
containerObj.remove(v: true) if removeContainer
|
||||
.catch (err) =>
|
||||
# Get the statusCode from the original cause and make sure statusCode its definitely a string for comparison
|
||||
# reasons.
|
||||
statusCode = '' + err.statusCode
|
||||
# 304 means the container was already stopped - so we can just remove it
|
||||
if statusCode is '304'
|
||||
@logger.logSystemEvent(logTypes.stopServiceNoop, { service })
|
||||
if removeContainer
|
||||
return containerObj.remove(v: true)
|
||||
return
|
||||
# 404 means the container doesn't exist, precisely what we want! :D
|
||||
if statusCode is '404'
|
||||
@logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service })
|
||||
return
|
||||
throw err
|
||||
.tap =>
|
||||
delete @containerHasDied[containerId] if @containerHasDied[containerId]?
|
||||
.tap =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceSuccess, { service })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceError, { service, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
@reportChange(containerId) if service.imageId?
|
||||
|
||||
kill: (service, { removeContainer = true } = {}) =>
|
||||
@_killContainer(service.containerId, service, { removeContainer })
|
||||
.then ->
|
||||
service.running = false
|
||||
return service
|
||||
|
||||
getAllByAppId: (appId) =>
|
||||
@getAll("io.resin.app_id=#{appId}")
|
||||
.filter (service) ->
|
||||
service.appId == appId
|
||||
|
||||
stopAllByAppId: (appId) =>
|
||||
Promise.map @getAllByAppId(appId), (service) =>
|
||||
@kill(service, { removeContainer: false })
|
||||
|
||||
create: (service) =>
|
||||
mockContainerId = @config.newUniqueKey()
|
||||
@get(service)
|
||||
.then ([ existingService ]) =>
|
||||
return @docker.getContainer(existingService.containerId) if existingService?
|
||||
conf = service.toContainerConfig()
|
||||
@logger.logSystemEvent(logTypes.installService, { service })
|
||||
@reportNewStatus(mockContainerId, service, 'Installing')
|
||||
@docker.createContainer(conf)
|
||||
.tap (container) =>
|
||||
service.containerId = container.id
|
||||
@logger.logSystemEvent(logTypes.installServiceSuccess, { service })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.installServiceError, { service, error: err })
|
||||
throw err
|
||||
.finally =>
|
||||
@reportChange(mockContainerId)
|
||||
|
||||
start: (service) =>
|
||||
alreadyStarted = false
|
||||
containerId = null
|
||||
@create(service)
|
||||
.tap (container) =>
|
||||
containerId = container.id
|
||||
@logger.logSystemEvent(logTypes.startService, { service })
|
||||
@reportNewStatus(containerId, service, 'Starting')
|
||||
container.start()
|
||||
.catch (err) =>
|
||||
statusCode = '' + err.statusCode
|
||||
# 304 means the container was already started, precisely what we want :)
|
||||
if statusCode is '304'
|
||||
alreadyStarted = true
|
||||
return
|
||||
|
||||
if statusCode is '500' and err.json.trim().match(/exec format error$/)
|
||||
# Provide a friendlier error message for "exec format error"
|
||||
@config.get('deviceType')
|
||||
.then (deviceType) ->
|
||||
throw new Error("Application architecture incompatible with #{deviceType}: exec format error")
|
||||
else
|
||||
# rethrow the same error
|
||||
throw err
|
||||
.catch (err) ->
|
||||
# If starting the container failed, we remove it so that it doesn't litter
|
||||
container.remove(v: true)
|
||||
.finally =>
|
||||
@logger.logSystemEvent(logTypes.startServiceError, { service, error: err })
|
||||
throw err
|
||||
.then =>
|
||||
@logger.attach(@docker, container.id, service.serviceId)
|
||||
.tap =>
|
||||
if alreadyStarted
|
||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
||||
else
|
||||
@logger.logSystemEvent(logTypes.startServiceSuccess, { service })
|
||||
.then (container) ->
|
||||
service.running = true
|
||||
.finally =>
|
||||
@reportChange(containerId)
|
||||
|
||||
# Gets all existing containers that correspond to apps
|
||||
getAll: (extraLabelFilters = []) =>
|
||||
filters = label: [ 'io.resin.supervised' ].concat(extraLabelFilters)
|
||||
@docker.listContainers({ all: true, filters })
|
||||
.then (containers) =>
|
||||
Promise.mapSeries containers, (container) =>
|
||||
@docker.getContainer(container.Id).inspect()
|
||||
.then(Service.fromContainer)
|
||||
|
||||
# Returns an array with the container(s) matching a service by appId, commit, image and environment
|
||||
get: (service) =>
|
||||
@getAll("io.resin.service_id=#{service.serviceId}")
|
||||
.then (services = []) ->
|
||||
return _.filter services, (currentService) ->
|
||||
currentService.isSameContainer(service)
|
||||
|
||||
getStatus: =>
|
||||
@getAll()
|
||||
.then (services) =>
|
||||
status = _.clone(@volatileState)
|
||||
_.forEach services, (service) ->
|
||||
status[service.containerId] ?= _.pick(service, [ 'appId', 'imageId', 'status', 'releaseId', 'commit' ])
|
||||
return _.values(status)
|
||||
|
||||
getByDockerContainerId: (containerId) =>
|
||||
@docker.getContainer(containerId).inspect()
|
||||
.then (container) ->
|
||||
return Service.fromContainer(container)
|
||||
.catchReturn(null)
|
||||
|
||||
waitToKill: (service, timeout) ->
|
||||
startTime = Date.now()
|
||||
pollInterval = 100
|
||||
timeout = checkInt(timeout, positive: true) ? 60000
|
||||
checkFileOrTimeout = ->
|
||||
killme = service.killmeFullPathOnHost()
|
||||
fs.statAsync(killme)
|
||||
.catch (err) ->
|
||||
throw err unless (Date.now() - startTime) > timeout
|
||||
.then ->
|
||||
fs.unlinkAsync(killme).catch(_.noop)
|
||||
retryCheck = ->
|
||||
checkFileOrTimeout()
|
||||
.catch ->
|
||||
Promise.delay(pollInterval).then(retryCheck)
|
||||
retryCheck()
|
||||
|
||||
prepareForHandover: (service) =>
|
||||
@get(service)
|
||||
.then ([ svc ]) =>
|
||||
container = @docker.getContainer(svc.containerId)
|
||||
container.update(RestartPolicy: {})
|
||||
.then ->
|
||||
container.rename(name: "old_#{service.serviceName}_#{service.releaseId}")
|
||||
|
||||
updateReleaseId: (service, releaseId) =>
|
||||
@get(service)
|
||||
.then ([ svc ]) =>
|
||||
@docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{releaseId}")
|
||||
|
||||
handover: (currentService, targetService) =>
|
||||
# We set the running container to not restart so that in case of a poweroff
|
||||
# it doesn't come back after boot.
|
||||
@prepareForHandover(currentService)
|
||||
.then =>
|
||||
@start(targetService)
|
||||
.then =>
|
||||
@waitToKill(currentService, targetService.labels['io.resin.update.handover_timeout'])
|
||||
.then =>
|
||||
@kill(currentService)
|
||||
|
||||
listenToEvents: =>
|
||||
return if @listening
|
||||
@listening = true
|
||||
@docker.getEvents(filters: type: [ 'container' ])
|
||||
.then (stream) =>
|
||||
parser = new JSONParser()
|
||||
parser.onError = (err) ->
|
||||
console.error('Error on docker events stream', err, err.stack)
|
||||
stream.destroy()
|
||||
parser.onValue = (data) =>
|
||||
return if parser.stack.length != 0
|
||||
if data?.status in ['die', 'start']
|
||||
setImmediate =>
|
||||
@getByDockerContainerId(data.id)
|
||||
.then (service) =>
|
||||
if service?
|
||||
if data.status == 'die'
|
||||
@logger.logSystemEvent(logTypes.serviceExit, { service })
|
||||
@containerHasDied[data.id] = true
|
||||
else if data.status == 'start' and @containerHasDied[data.id]
|
||||
@logger.logSystemEvent(logTypes.serviceRestart, { service })
|
||||
@logger.attach(@docker, data.id, service.serviceId)
|
||||
.catch (err) ->
|
||||
console.error('Error on docker event:', err, err.stack)
|
||||
new Promise (resolve, reject) ->
|
||||
stream
|
||||
.on 'error', (err) ->
|
||||
console.error('Error on docker events stream', err)
|
||||
reject()
|
||||
.on 'data', (data) ->
|
||||
if typeof data is 'string'
|
||||
data = new Buffer(data)
|
||||
parser.write(data)
|
||||
.on 'end', ->
|
||||
console.error('Docker events stream ended, restarting listener')
|
||||
resolve()
|
||||
.finally =>
|
||||
@listening = false
|
||||
setImmediate( => @listenToEvents())
|
||||
.catch (err) =>
|
||||
console.error('Error starting to listen to events:', err, err.stack)
|
||||
setTimeout =>
|
||||
@listenToEvents()
|
||||
, 1000
|
||||
return
|
||||
|
||||
attachToRunning: =>
|
||||
@getAll()
|
||||
.map (service) =>
|
||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
||||
@logger.attach(@docker, service.containerId, service.serviceId)
|
312
src/compose/service.coffee
Normal file
312
src/compose/service.coffee
Normal file
@ -0,0 +1,312 @@
|
||||
_ = require 'lodash'
|
||||
path = require 'path'
|
||||
{ checkTruthy, checkInt } = require '../lib/validation'
|
||||
updateLock = require '../lib/update-lock'
|
||||
constants = require '../lib/constants'
|
||||
conversions = require '../lib/conversions'
|
||||
|
||||
validRestartPolicies = [ 'no', 'always', 'on-failure', 'unless-stopped' ]
|
||||
|
||||
# Construct a restart policy based on its name.
|
||||
# The default policy (if name is not a valid policy) is "always".
|
||||
createRestartPolicy = (name) ->
|
||||
if not (name in validRestartPolicies)
|
||||
name = 'unless-stopped'
|
||||
return { Name: name, MaximumRetryCount: 0 }
|
||||
|
||||
getCommand = (service, imageInfo) ->
|
||||
if service.command?
|
||||
return service.command
|
||||
else if imageInfo?.Config?.Cmd
|
||||
return imageInfo.Config.Cmd
|
||||
|
||||
getEntrypoint = (service, imageInfo) ->
|
||||
if service.entrypoint?
|
||||
return service.entrypoint
|
||||
else if imageInfo?.Config?.Entrypoint
|
||||
return imageInfo.Config.Entrypoint
|
||||
|
||||
killmePath = (appId, serviceName) ->
|
||||
return updateLock.lockPath(appId, serviceName)
|
||||
|
||||
defaultBinds = (appId, serviceName) ->
|
||||
return [
|
||||
"#{updateLock.lockPath(appId, serviceName)}:/tmp/resin"
|
||||
]
|
||||
|
||||
module.exports = class Service
|
||||
constructor: (serviceProperties, opts = {}) ->
|
||||
{
|
||||
@image
|
||||
@expose
|
||||
@ports
|
||||
@network_mode
|
||||
@privileged
|
||||
@releaseId
|
||||
@imageId
|
||||
@serviceId
|
||||
@appId
|
||||
@serviceName
|
||||
@containerId
|
||||
@running
|
||||
@createdAt
|
||||
@environment
|
||||
@command
|
||||
@entrypoint
|
||||
@labels
|
||||
@volumes
|
||||
@restartPolicy
|
||||
@depends_on
|
||||
@cap_add
|
||||
@cap_drop
|
||||
@commit
|
||||
@status
|
||||
} = serviceProperties
|
||||
@privileged ?= false
|
||||
@volumes ?= []
|
||||
@labels ?= {}
|
||||
@environment ?= {}
|
||||
@running ?= true
|
||||
@ports ?= []
|
||||
@expose ?= []
|
||||
@cap_add ?= []
|
||||
@cap_drop ?= []
|
||||
@network_mode ?= @appId.toString()
|
||||
if @releaseId?
|
||||
@releaseId = @releaseId.toString()
|
||||
|
||||
# If the service has no containerId, it is a target service and has to be normalised and extended
|
||||
if !@containerId?
|
||||
@restartPolicy = createRestartPolicy(serviceProperties.restart)
|
||||
@command = getCommand(serviceProperties, opts.imageInfo)
|
||||
@entrypoint = getEntrypoint(serviceProperties, opts.imageInfo)
|
||||
@extendEnvVars(opts)
|
||||
@extendLabels(opts.imageInfo)
|
||||
@extendAndSanitiseVolumes(opts.imageInfo)
|
||||
|
||||
if checkTruthy(@labels['io.resin.features.dbus'])
|
||||
@volumes.push('/run/dbus:/host/run/dbus')
|
||||
if checkTruthy(@labels['io.resin.features.kernel_modules'])
|
||||
@volumes.push('/lib/modules:/lib/modules')
|
||||
if checkTruthy(@labels['io.resin.features.firmware'])
|
||||
@volumes.push('/lib/firmware:/lib/firmware')
|
||||
if checkTruthy(@labels['io.resin.features.supervisor_api'])
|
||||
@environment['RESIN_SUPERVISOR_HOST'] = opts.supervisorApiHost
|
||||
@environment['RESIN_SUPERVISOR_PORT'] = opts.listenPort.toString()
|
||||
@environment['RESIN_SUPERVISOR_ADDRESS'] = "http://#{opts.supervisorApiHost}:#{opts.listenPort}"
|
||||
@environment['RESIN_SUPERVISOR_API_KEY'] = opts.apiSecret
|
||||
if checkTruthy(@labels['io.resin.features.resin_api'])
|
||||
@environment['RESIN_API_KEY'] = opts.deviceApiKey
|
||||
|
||||
extendEnvVars: ({ imageInfo, uuid, appName, commit, name, version, deviceType, osVersion }) =>
|
||||
newEnv =
|
||||
RESIN_APP_ID: @appId.toString()
|
||||
RESIN_APP_NAME: appName
|
||||
RESIN_APP_COMMIT: commit
|
||||
RESIN_APP_RELEASE: @releaseId.toString()
|
||||
RESIN_SERVICE_NAME: @serviceName
|
||||
RESIN_DEVICE_UUID: uuid
|
||||
RESIN_DEVICE_NAME_AT_INIT: name
|
||||
RESIN_DEVICE_TYPE: deviceType
|
||||
RESIN_HOST_OS_VERSION: osVersion
|
||||
RESIN_SUPERVISOR_VERSION: version
|
||||
RESIN_APP_LOCK_PATH: '/tmp/resin/resin-updates.lock'
|
||||
RESIN_SERVICE_KILL_ME_PATH: '/tmp/resin/resin-kill-me'
|
||||
RESIN: '1'
|
||||
USER: 'root'
|
||||
if @environment?
|
||||
_.defaults(newEnv, @environment)
|
||||
_.defaults(newEnv, conversions.envArrayToObject(imageInfo?.Config?.Env ? []))
|
||||
@environment = newEnv
|
||||
return @environment
|
||||
|
||||
extendLabels: (imageInfo) =>
|
||||
@labels = _.clone(@labels)
|
||||
_.defaults(@labels, imageInfo?.Config?.Labels ? {})
|
||||
@labels['io.resin.supervised'] = 'true'
|
||||
@labels['io.resin.app_id'] = @appId.toString()
|
||||
@labels['io.resin.service_id'] = @serviceId.toString()
|
||||
@labels['io.resin.service_name'] = @serviceName
|
||||
@labels['io.resin.image_id'] = @imageId.toString()
|
||||
@labels['io.resin.commit'] = @commit
|
||||
return @labels
|
||||
|
||||
extendAndSanitiseVolumes: (imageInfo) =>
|
||||
volumes = []
|
||||
_.forEach @volumes, (vol) ->
|
||||
isBind = /:/.test(vol)
|
||||
if isBind
|
||||
bindSource = vol.split(':')[0]
|
||||
if !path.isAbsolute(bindSource)
|
||||
volumes.push(vol)
|
||||
else
|
||||
console.log("Ignoring invalid bind mount #{vol}")
|
||||
else
|
||||
volumes.push(vol)
|
||||
volumes = volumes.concat(@defaultBinds())
|
||||
volumes = _.union(_.keys(imageInfo?.Config?.Volumes), volumes)
|
||||
@volumes = volumes
|
||||
return @volumes
|
||||
|
||||
getNamedVolumes: =>
|
||||
defaults = @defaultBinds()
|
||||
validVolumes = _.map @volumes, (vol) ->
|
||||
return null if _.includes(defaults, vol)
|
||||
return null if !/:/.test(vol)
|
||||
bindSource = vol.split(':')[0]
|
||||
if !path.isAbsolute(bindSource)
|
||||
return bindSource
|
||||
else
|
||||
return null
|
||||
return _.filter(validVolumes, (v) -> !_.isNull(v))
|
||||
|
||||
lockPath: =>
|
||||
return updateLock.lockPath(@appId)
|
||||
|
||||
killmePath: =>
|
||||
return killmePath(@appId, @serviceName)
|
||||
|
||||
killmeFullPathOnHost: =>
|
||||
return "#{constants.rootMountPoint}#{@killmePath()}/resin-kill-me"
|
||||
|
||||
defaultBinds: ->
|
||||
return defaultBinds(@appId, @serviceName)
|
||||
|
||||
@fromContainer: (container) ->
|
||||
if container.State.Running
|
||||
status = 'Running'
|
||||
else if container.State.Status == 'created'
|
||||
status = 'Installed'
|
||||
else
|
||||
status = 'Stopped'
|
||||
|
||||
boundContainerPorts = []
|
||||
ports = []
|
||||
expose = []
|
||||
_.forEach container.HostConfig.PortBindings, (conf, port) ->
|
||||
containerPort = port.match(/^([0-9]*)\/tcp$/)?[1]
|
||||
if containerPort?
|
||||
boundContainerPorts.push(containerPort)
|
||||
hostPort = conf[0]?.HostPort
|
||||
if !_.isEmpty(hostPort)
|
||||
ports.push("#{hostPort}:#{containerPort}")
|
||||
else
|
||||
ports.push(containerPort)
|
||||
_.forEach container.Config.ExposedPorts, (conf, port) ->
|
||||
containerPort = port.match(/^([0-9]*)\/tcp$/)?[1]
|
||||
if containerPort? and !_.includes(boundContainerPorts, containerPort)
|
||||
expose.push(containerPort)
|
||||
|
||||
appId = container.Config.Labels['io.resin.app_id']
|
||||
serviceId = container.Config.Labels['io.resin.service_id']
|
||||
serviceName = container.Config.Labels['io.resin.service_name']
|
||||
releaseId = container.Name.match(/.*_(\d+)$/)?[1]
|
||||
service = {
|
||||
appId: appId
|
||||
serviceId: serviceId
|
||||
serviceName: serviceName
|
||||
imageId: checkInt(container.Config.Labels['io.resin.image_id'])
|
||||
command: container.Config.Cmd
|
||||
entrypoint: container.Config.Entrypoint
|
||||
network_mode: container.HostConfig.NetworkMode
|
||||
volumes: _.concat(container.HostConfig.Binds ? [], _.keys(container.Config.Volumes ? {}))
|
||||
image: container.Config.Image
|
||||
environment: conversions.envArrayToObject(container.Config.Env)
|
||||
privileged: container.HostConfig.Privileged
|
||||
releaseId: releaseId
|
||||
commit: container.Config.Labels['io.resin.commit']
|
||||
labels: container.Config.Labels
|
||||
running: container.State.Running
|
||||
createdAt: new Date(container.Created)
|
||||
restartPolicy: container.HostConfig.RestartPolicy
|
||||
ports: ports
|
||||
expose: expose
|
||||
containerId: container.Id
|
||||
cap_add: container.HostConfig.CapAdd
|
||||
cap_drop: container.HostConfig.CapDrop
|
||||
status
|
||||
}
|
||||
return new Service(service)
|
||||
|
||||
# TODO: map ports for any of the possible formats "container:host/protocol", port ranges, etc.
|
||||
getPortsAndPortBindings: =>
|
||||
ports = {}
|
||||
portBindings = {}
|
||||
if @ports?
|
||||
_.forEach @ports, (port) ->
|
||||
[ hostPort, containerPort ] = port.split(':')
|
||||
ports[containerPort + '/tcp'] = {}
|
||||
portBindings[containerPort + '/tcp'] = [ HostPort: hostPort ]
|
||||
if @expose?
|
||||
_.forEach @expose, (port) ->
|
||||
ports[port + '/tcp'] = {}
|
||||
return { ports, portBindings }
|
||||
|
||||
getBindsAndVolumes: =>
|
||||
binds = []
|
||||
volumes = {}
|
||||
_.forEach @volumes, (vol) ->
|
||||
isBind = /:/.test(vol)
|
||||
if isBind
|
||||
binds.push(vol)
|
||||
else
|
||||
volumes[vol] = {}
|
||||
return { binds, volumes }
|
||||
|
||||
toContainerConfig: =>
|
||||
{ ports, portBindings } = @getPortsAndPortBindings()
|
||||
{ binds, volumes } = @getBindsAndVolumes()
|
||||
|
||||
conf = {
|
||||
name: "#{@serviceName}_#{@releaseId}"
|
||||
Image: @image
|
||||
Cmd: @command
|
||||
Entrypoint: @entrypoint
|
||||
Tty: true
|
||||
Volumes: volumes
|
||||
Env: _.map @environment, (v, k) -> k + '=' + v
|
||||
ExposedPorts: ports
|
||||
Labels: @labels
|
||||
HostConfig:
|
||||
Privileged: @privileged
|
||||
NetworkMode: @network_mode
|
||||
PortBindings: portBindings
|
||||
Binds: binds
|
||||
RestartPolicy: @restartPolicy
|
||||
CapAdd: @cap_add
|
||||
CapDrop: @cap_drop
|
||||
}
|
||||
# If network mode is the default network for this app, add alias for serviceName
|
||||
if @network_mode == @appId.toString()
|
||||
conf.NetworkingConfig = {
|
||||
EndpointsConfig: {
|
||||
"#{@appId}": {
|
||||
Aliases: [ @serviceName ]
|
||||
}
|
||||
}
|
||||
}
|
||||
return conf
|
||||
|
||||
isSameContainer: (otherService) =>
|
||||
propertiesToCompare = [
|
||||
'image'
|
||||
'imageId'
|
||||
'network_mode'
|
||||
'privileged'
|
||||
'restartPolicy'
|
||||
'labels'
|
||||
'environment'
|
||||
'cap_add'
|
||||
'cap_drop'
|
||||
]
|
||||
arraysToCompare = [
|
||||
'ports'
|
||||
'expose'
|
||||
'volumes'
|
||||
]
|
||||
return _.isEqual(_.pick(this, propertiesToCompare), _.pick(otherService, propertiesToCompare)) and
|
||||
_.every arraysToCompare, (property) =>
|
||||
_.isEmpty(_.xorWith(this[property], otherService[property], _.isEqual))
|
||||
|
||||
isEqual: (otherService) =>
|
||||
return @isSameContainer(otherService) and @running == otherService.running and @releaseId == otherService.releaseId
|
108
src/compose/volumes.coffee
Normal file
108
src/compose/volumes.coffee
Normal file
@ -0,0 +1,108 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
fs = Promise.promisifyAll(require('fs'))
|
||||
ncp = Promise.promisify(require('ncp').ncp)
|
||||
rimraf = Promise.promisify(require('rimraf'))
|
||||
exec = Promise.promisify(require('child_process').exec)
|
||||
ncp.limit = 16
|
||||
|
||||
logTypes = require '../lib/log-types'
|
||||
migration = require '../lib/migration'
|
||||
constants = require '../lib/constants'
|
||||
|
||||
ENOENT = (err) -> err.code is 'ENOENT'
|
||||
|
||||
module.exports = class Volumes
|
||||
constructor: ({ @docker, @logger }) ->
|
||||
|
||||
format: (volume) ->
|
||||
appId = volume.Labels['io.resin.app_id']
|
||||
return {
|
||||
name: volume.Name
|
||||
appId
|
||||
config: {
|
||||
labels: _.omit(volume.Labels, _.keys(@defaultLabels(appId)))
|
||||
driver_opts: volume.Options
|
||||
}
|
||||
}
|
||||
|
||||
getAll: =>
|
||||
@docker.listVolumes(filters: label: [ 'io.resin.supervised' ])
|
||||
.then (response) =>
|
||||
volumes = response.Volumes ? []
|
||||
Promise.map volumes, (volume) =>
|
||||
@docker.getVolume(volume.Name).inspect()
|
||||
.then (vol) =>
|
||||
@format(vol)
|
||||
|
||||
getAllByAppId: (appId) =>
|
||||
@getAll()
|
||||
.then (volumes) ->
|
||||
_.filter(volumes, (v) -> v.appId == appId)
|
||||
|
||||
get: (name) ->
|
||||
@docker.getVolume(name).inspect()
|
||||
.then (volume) ->
|
||||
return @format(volume)
|
||||
|
||||
defaultLabels: (appId) ->
|
||||
return {
|
||||
'io.resin.supervised': 'true'
|
||||
'io.resin.app_id': appId
|
||||
}
|
||||
|
||||
# TODO: what config values are relevant/whitelisted?
|
||||
create: ({ name, config = {}, appId }) =>
|
||||
@logger.logSystemEvent(logTypes.createVolume, { volume: { name } })
|
||||
labels = _.clone(config.labels) ? {}
|
||||
_.assign(labels, @defaultLabels(appId))
|
||||
driverOpts = config.driver_opts ? {}
|
||||
@docker.createVolume({
|
||||
Name: name
|
||||
Labels: labels
|
||||
DriverOpts: driverOpts
|
||||
})
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.createVolumeError, { volume: { name }, error: err })
|
||||
throw err
|
||||
|
||||
createFromLegacy: (appId) =>
|
||||
name = migration.defaultLegacyVolume(appId)
|
||||
@create({ name, appId })
|
||||
.then (v) ->
|
||||
v.inspect()
|
||||
.then (v) ->
|
||||
volumePath = "#{constants.rootMountPoint}#{v.Mountpoint}"
|
||||
legacyPath = "/mnt/root/resin-data/#{appId}"
|
||||
fs.lstatAsync(legacyPath)
|
||||
.catch ENOENT, (err) ->
|
||||
fs.lstatAsync(legacyPath + '-old')
|
||||
.then ->
|
||||
rimraf(legacyPath + '-old')
|
||||
.finally ->
|
||||
throw err
|
||||
.then ->
|
||||
ncp(legacyPath, volumePath)
|
||||
.then ->
|
||||
exec('sync')
|
||||
.then ->
|
||||
# Before deleting, we rename so that if there's an unexpected poweroff
|
||||
# next time we won't copy a partially deleted folder into the volume
|
||||
fs.renameAsync(legacyPath, legacyPath + '-old')
|
||||
.then ->
|
||||
rimraf(legacyPath + '-old')
|
||||
.catch (err) ->
|
||||
console.log("Ignoring legacy data volume migration due to #{err}")
|
||||
|
||||
remove: ({ name }) ->
|
||||
@logger.logSystemEvent(logTypes.removeVolume, { volume: { name } })
|
||||
@docker.getVolume(name).remove()
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.removeVolumeError, { volume: { name }, error: err })
|
||||
|
||||
isEqualConfig: (current, target) ->
|
||||
currentOpts = current.driver_opts ? {}
|
||||
targetOpts = target.driver_opts ? {}
|
||||
currentLabels = current.labels ? {}
|
||||
targetLabels = target.labels ? {}
|
||||
return _.isEqual(currentLabels, targetLabels) and _.isEqual(currentOpts, targetOpts)
|
Loading…
x
Reference in New Issue
Block a user