Switch to a new image management system keeping the docker image ID in the database, allowing deltas and proper comparison for images that have a digest.

Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
Pablo Carranza Velez 2017-12-14 21:15:59 -08:00
parent d84bcf0fb4
commit 3a710506a6
7 changed files with 182 additions and 92 deletions

View File

@ -10,6 +10,7 @@ process.env.DOCKER_HOST ?= "unix://#{constants.dockerSocket}"
Docker = require './lib/docker-utils'
updateLock = require './lib/update-lock'
{ checkTruthy, checkInt, checkString } = require './lib/validation'
{ NotFoundError } = require './lib/errors'
ServiceManager = require './compose/service-manager'
Service = require './compose/service'
@ -28,7 +29,7 @@ serviceAction = (action, serviceId, current, target, options) ->
# TODO: move this to an Image class?
imageForService = (service) ->
return {
name: service.image
name: service.imageName
appId: service.appId
serviceId: service.serviceId
serviceName: service.serviceName
@ -186,7 +187,7 @@ module.exports = class ApplicationManager extends EventEmitter
@services.kill(step.current)
.then =>
if step.options?.removeImage
@images.remove(imageForService(step.current))
@images.removeByDockerId(step.current.image)
updateMetadata: (step) =>
@services.updateMetadata(step.current, step.target)
purge: (step, { force = false } = {}) =>
@ -238,8 +239,8 @@ module.exports = class ApplicationManager extends EventEmitter
@reportCurrentState(update_downloaded: true)
removeImage: (step) =>
@images.remove(step.image)
updateImage: (step) =>
@images.update(step.target)
saveImage: (step) =>
@images.save(step.image)
cleanup: (step) =>
@images.cleanup()
createNetworkOrVolume: (step) =>
@ -431,6 +432,7 @@ module.exports = class ApplicationManager extends EventEmitter
target: targetServicesPerId[serviceId]
serviceId
})
return { removePairs, installPairs, updatePairs }
_compareNetworksOrVolumesForUpdate: (model, { current, target }, appId) ->
@ -480,19 +482,6 @@ module.exports = class ApplicationManager extends EventEmitter
compareVolumesForUpdate: ({ current, target }, appId) =>
@_compareNetworksOrVolumesForUpdate(@volumes, { current, target }, appId)
# TODO: should we consider the case where several services use the same image?
# In such case we should do more complex matching to allow several image objects
# with the same name but different metadata. For now this shouldn't be necessary
# because the Resin model requires a different imageId and name for each service.
compareImagesForMetadataUpdate: (availableImages, targetServices) ->
pairs = []
targetImages = _.map(targetServices, imageForService)
for target in targetImages
imageWithSameContent = _.find(availableImages, (img) => @images.isSameImage(img, target))
if imageWithSameContent? and !_.find(availableImages, (img) -> _.isEqual(_.omit(img, 'id'), target))
pairs.push({ current: imageWithSameContent, target, serviceId: target.serviceId })
return pairs
# Checks if a service is using a network or volume that is about to be updated
_hasCurrentNetworksOrVolumes: (service, networkPairs, volumePairs) ->
if !service?
@ -537,7 +526,7 @@ module.exports = class ApplicationManager extends EventEmitter
if target.dependsOn?
for dependency in target.dependsOn
dependencyService = _.find(targetApp.services, (s) -> s.serviceName == dependency)
if !_.find(availableImages, (image) => @images.isSameImage(image, { name: dependencyService.image }))?
if !_.find(availableImages, (image) => @images.isSameImage(image, { name: dependencyService.imageName }))?
return false
return true
@ -619,7 +608,7 @@ module.exports = class ApplicationManager extends EventEmitter
# There is already a step in progress for this service, so we wait
return null
needsDownload = !_.some(availableImages, (image) => @images.isSameImage(image, { name: target.image }))
needsDownload = !_.some(availableImages, (image) => @images.isSameImage(image, { name: target.imageName }))
dependenciesMetForStart = =>
@_dependenciesMetForServiceStart(target, networkPairs, volumePairs, installPairs.concat(updatePairs), stepsInProgress)
dependenciesMetForKill = =>
@ -656,7 +645,6 @@ module.exports = class ApplicationManager extends EventEmitter
networkPairs = @compareNetworksForUpdate({ current: currentApp.networks, target: targetApp.networks }, appId)
volumePairs = @compareVolumesForUpdate({ current: currentApp.volumes, target: targetApp.volumes }, appId)
{ removePairs, installPairs, updatePairs } = @compareServicesForUpdate(currentApp.services, targetApp.services)
imagePairs = @compareImagesForMetadataUpdate(availableImages, targetApp.services)
steps = []
# All removePairs get a 'kill' action
for pair in removePairs
@ -676,8 +664,6 @@ module.exports = class ApplicationManager extends EventEmitter
for pair in volumePairs
pairSteps = @_nextStepsForVolume(pair, currentApp, removePairs.concat(updatePairs))
steps = steps.concat(pairSteps)
for pair in imagePairs
steps.push(_.assign({ action: 'updateImage' }, pair))
return steps
normaliseAppForDB: (app) =>
@ -705,13 +691,16 @@ module.exports = class ApplicationManager extends EventEmitter
createTargetService: (service, opts) ->
@images.inspectByName(service.image)
.catchReturn(undefined)
.catchReturn(NotFoundError, undefined)
.then (imageInfo) ->
serviceOpts = {
serviceName: service.serviceName
imageInfo
}
_.assign(serviceOpts, opts)
service.imageName = service.image
if imageInfo?.Id?
service.image = imageInfo.Id
return new Service(service, serviceOpts)
normaliseAndExtendAppFromDB: (app) =>
@ -803,30 +792,40 @@ module.exports = class ApplicationManager extends EventEmitter
return availableImage.name
return 'resin/scratch'
# return images that:
# returns:
# imagesToRemove: images that
# - are not used in the current state, and
# - are not going to be used in the target state, and
# - are not needed for delta source / pull caching or would be used for a service with delete-then-download as strategy
_unnecessaryImages: (current, target, available) =>
# imagesToSave: images that
# - are locally available (i.e. an image with the same digest exists)
# - are not saved to the DB with all their metadata (serviceId, serviceName, etc)
_compareImages: (current, target, available) =>
allImagesForApp = (app) -> _.map(app.services, imageForService)
currentImages = _.flatten(_.map(current.local.apps, allImagesForApp))
targetImages = _.flatten(_.map(target.local.apps, allImagesForApp))
availableAndUnused = _.filter available, (image) =>
!_.some currentImages.concat(targetImages), (imageInUse) => @images.isSameImage(image, imageInUse)
allImagesForTargetApp = (app) -> _.map(app.services, imageForService)
allImagesForCurrentApp = (app) ->
_.map app.services, (service) ->
_.omit(_.find(available, (image) -> image.dockerImageId == service.image), [ 'dockerImageId', 'id' ])
availableWithoutIds = _.map(available, (image) -> _.omit(image, [ 'dockerImageId', 'id' ]))
currentImages = _.flatten(_.map(current.local.apps, allImagesForCurrentApp))
targetImages = _.flatten(_.map(target.local.apps, allImagesForTargetApp))
availableAndUnused = _.filter availableWithoutIds, (image) ->
!_.some currentImages.concat(targetImages), (imageInUse) -> _.isEqual(image, imageInUse)
imagesToDownload = _.filter targetImages, (targetImage) =>
!_.some available, (availableImage) => @images.isSameImage(availableImage, targetImage)
# Images that are available but we don't have them in the DB with the exact metadata:
imagesToSave = _.filter targetImages, (targetImage) =>
_.some(available, (availableImage) => @images.isSameImage(availableImage, targetImage)) and
!_.find(availableWithoutIds, (img) -> _.isEqual(img, targetImage))?
deltaSources = _.map imagesToDownload, (image) =>
return @bestDeltaSource(image, available)
proxyvisorImages = @proxyvisor.imagesInUse(current, target)
return _.filter availableAndUnused, (image) =>
imagesToRemove = _.filter availableAndUnused, (image) =>
notUsedForDelta = !_.some deltaSources, (deltaSource) -> deltaSource == image.name
notUsedByProxyvisor = !_.some proxyvisorImages, (proxyvisorImage) => @images.isSameImage(image, { name: proxyvisorImage })
return notUsedForDelta and notUsedByProxyvisor
return { imagesToSave, imagesToRemove }
_inferNextSteps: (cleanupNeeded, availableImages, supervisorNetworkReady, current, target, stepsInProgress) =>
Promise.try =>
@ -839,9 +838,12 @@ module.exports = class ApplicationManager extends EventEmitter
if !_.some(stepsInProgress, (step) -> step.action == 'fetch')
if cleanupNeeded
nextSteps.push({ action: 'cleanup' })
imagesToRemove = @_unnecessaryImages(current, target, availableImages)
for image in imagesToRemove
nextSteps.push({ action: 'removeImage', image })
{ imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages)
for image in imagesToSave
nextSteps.push({ action: 'saveImage', image })
if _.isEmpty(imagesToSave)
for image in imagesToRemove
nextSteps.push({ action: 'removeImage', image })
# If we have to remove any images, we do that before anything else
if _.isEmpty(nextSteps)
allAppIds = _.union(_.keys(currentByAppId), _.keys(targetByAppId))

View File

@ -8,13 +8,14 @@ validation = require '../lib/validation'
{ NotFoundError } = require '../lib/errors'
# image = {
# name: image registry/repo:tag
# name: image registry/repo@digest or registry/repo:tag
# appId
# serviceId
# serviceName
# imageId (from resin API)
# releaseId
# dependent
# dockerImageId
# status Downloading, Downloaded, Deleting
# downloadProgress
# }
@ -45,15 +46,25 @@ module.exports = class Images extends EventEmitter
@markAsSupervised(image)
.then =>
@inspectByName(imageName)
.tap (img) =>
@db.models('image').update({ dockerImageId: img.Id }).where(image)
.catch =>
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 }))
Promise.try =>
if validation.checkTruthy(opts.delta)
@logger.logSystemEvent(logTypes.downloadImageDelta, { image })
@docker.rsyncImageWithProgress(imageName, opts, onProgress)
Promise.try =>
if opts.deltaSource
@inspectByName(opts.deltaSource)
.then (srcImage) ->
opts.deltaSourceId = srcImage.Id
.then =>
@docker.rsyncImageWithProgress(imageName, opts, onProgress)
else
@logger.logSystemEvent(logTypes.downloadImage, { image })
@docker.fetchImageWithProgress(imageName, opts, onProgress)
.then (id) =>
@db.models('image').update({ dockerImageId: id }).where(image)
.then =>
@logger.logSystemEvent(logTypes.downloadImageSuccess, { image })
@inspectByName(imageName)
@ -68,7 +79,8 @@ module.exports = class Images extends EventEmitter
image.serviceName ?= null
image.imageId ?= null
image.releaseId ?= null
image.dependent ?= false
image.dependent ?= 0
image.dockerImageId ?= null
return _.omit(image, 'id')
markAsSupervised: (image) =>
@ -79,39 +91,64 @@ module.exports = class Images extends EventEmitter
image = @format(image)
@db.models('image').update(image).where(name: image.name)
_removeImageIfNotNeeded: (image) =>
save: (image) =>
@inspectByName(image.name)
.then (img) =>
@db.models('image').where(name: image.name).select()
.then (imagesFromDB) =>
if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(image))
@docker.getImage(image.name).remove(force: true)
.return(true)
.catchReturn(NotFoundError, false)
image = _.clone(image)
image.dockerImageId = img.Id
@markAsSupervised(image)
remove: (image) =>
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Deleting' }))
@logger.logSystemEvent(logTypes.deleteImage, { image })
@_removeImageIfNotNeeded(image)
.tap =>
@db.models('image').del().where(image)
_removeImageIfNotNeeded: (image) =>
# We first fetch the image from the DB to ensure it exists,
# and get the dockerImageId and any other missing field
@db.models('image').select().where(image)
.then (images) =>
if images.length == 0
return false
img = images[0]
Promise.try =>
if !img.dockerImageId?
# Legacy image from before we started using dockerImageId, so we try to remove it by name
@docker.getImage(img.name).remove(force: true)
.return(true)
else
@db.models('image').where(dockerImageId: img.dockerImageId).select()
.then (imagesFromDB) =>
if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(img))
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Deleting' }))
@logger.logSystemEvent(logTypes.deleteImage, { image })
@docker.getImage(img.dockerImageId).remove(force: true)
.return(true)
else
return false
.catchReturn(NotFoundError, false)
.tap =>
@db.models('image').del().where(id: img.id)
.then (removed) =>
if removed
@logger.logSystemEvent(logTypes.deleteImageSuccess, { image })
else
@logger.logSystemEvent(logTypes.imageAlreadyDeleted, { image })
.finally =>
@reportChange(image.imageId)
remove: (image) =>
@_removeImageIfNotNeeded(image)
.catch (err) =>
@logger.logSystemEvent(logTypes.deleteImageError, { image, error: err })
throw err
.finally =>
@reportChange(image.imageId)
getByDockerId: (id) =>
@db.models('image').where(dockerImageId: id).first()
removeByDockerId: (id) =>
@getByDockerId(id)
.then(@remove)
getNormalisedTags: (image) ->
Promise.map(image.RepoTags ? [], (tag) => @normalise(tag))
_withImagesFromDockerAndDB: (callback) =>
Promise.join(
@docker.listImages()
@docker.listImages(digests: true)
.map (image) =>
image.NormalisedRepoTags = @getNormalisedTags(image)
Promise.props(image)
@ -119,9 +156,13 @@ module.exports = class Images extends EventEmitter
callback
)
_isAvailableInDocker: (image, dockerImages) ->
_.some dockerImages, (dockerImage) ->
_.includes(dockerImage.NormalisedRepoTags, image.name) or _.includes(dockerImage.RepoDigests, image.name)
_matchesTagOrDigest: (image, dockerImage) ->
return _.includes(dockerImage.NormalisedRepoTags, image.name) or
_.some(dockerImage.RepoDigests, (digest) -> Images.hasSameDigest(image.name, digest))
_isAvailableInDocker: (image, dockerImages) =>
_.some dockerImages, (dockerImage) =>
@_matchesTagOrDigest(image, dockerImage) or image.dockerImageId == dockerImage.Id
# Gets all images that are supervised, in an object containing name, appId, serviceId, serviceName, imageId, dependent.
getAvailable: =>
@ -130,7 +171,17 @@ module.exports = class Images extends EventEmitter
cleanupDatabase: =>
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
return _.filter(supervisedImages, (image) => !@_isAvailableInDocker(image, dockerImages))
Promise.map supervisedImages, (image) =>
# If the supervisor was interrupted between fetching an image and storing its id,
# some entries in the db might need to have the dockerImageId populated
if !image.dockerImageId?
id = _.find(dockerImages, (dockerImage) => @_matchesTagOrDigest(image, dockerImage))?.Id
if id?
@db.models('image').update(dockerImageId: id).where(image)
.then ->
image.dockerImageId = id
.then =>
_.filter(supervisedImages, (image) => !@_isAvailableInDocker(image, dockerImages))
.then (imagesToRemove) =>
ids = _.map(imagesToRemove, 'id')
@db.models('image').del().whereIn('id', ids)
@ -147,35 +198,56 @@ module.exports = class Images extends EventEmitter
status[image.imageId] ?= image
return _.values(status)
_getOldSupervisorsForCleanup: =>
_getImagesForCleanup: =>
images = []
@docker.getRegistryAndName(constants.supervisorImage)
.then (supervisorImageInfo) =>
@docker.listImages()
.map (image) =>
Promise.map image.RepoTags ? [], (repoTag) =>
@docker.getRegistryAndName(repoTag)
.then ({ imageName, tagName }) ->
if imageName == supervisorImageInfo.imageName and tagName != supervisorImageInfo.tagName
images.push(repoTag)
Promise.join(
@docker.getRegistryAndName(constants.supervisorImage)
@db.models('image').select('dockerImageId')
.map((image) -> image.dockerImageId)
(supervisorImageInfo, usedImageIds) =>
@docker.listImages(digests: true)
.map (image) =>
# Cleanup should remove truly dangling images (i.e. dangling and with no digests)
if _.isEmpty(image.RepoTags) and _.isEmpty(image.RepoDigests) and not image.Id in usedImageIds
images.push(image.Id)
else if !_.isEmpty(image.RepoTags)
# We also remove images from the supervisor repository with a different tag
Promise.map image.RepoTags, (repoTag) =>
@docker.getRegistryAndName(repoTag)
.then ({ imageName, tagName }) ->
if imageName == supervisorImageInfo.imageName and tagName != supervisorImageInfo.tagName
images.push(image.Id)
)
.then(_.uniq)
.then =>
return _.filter images, (image) =>
!@imageCleanupFailures[image]? or Date.now() - @imageCleanupFailures[image] > constants.imageCleanupErrorIgnoreTimeout
inspectByName: (imageName) =>
@docker.getImage(imageName).inspect()
.catch NotFoundError, (err) =>
digest = imageName.split('@')[1]
if !digest?
throw err
@db.models('image').where('name', 'like', "%@#{digest}").select()
.then (imagesFromDB) =>
for image in imagesFromDB
if image.dockerImageId?
return @docker.getImage(image.dockerImageId).inspect()
throw err
normalise: (imageName) =>
@docker.normaliseImageName(imageName)
isCleanupNeeded: =>
@_getOldSupervisorsForCleanup()
@_getImagesForCleanup()
.then (imagesForCleanup) ->
return !_.isEmpty(imagesForCleanup)
# Delete old supervisor images
# Delete dangling images and old supervisor images
cleanup: =>
@_getOldSupervisorsForCleanup()
@_getImagesForCleanup()
.map (image) =>
console.log("Cleaning up #{image}")
@docker.getImage(image).remove(force: true)
@ -185,9 +257,12 @@ module.exports = class Images extends EventEmitter
@logger.logSystemMessage("Error cleaning up #{image}: #{err.message} - will ignore for 1 hour", { error: err }, 'Image cleanup error')
@imageCleanupFailures[image] = Date.now()
@hasSameDigest: (name1, name2) ->
hash1 = name1.split('@')[1]
hash2 = name2.split('@')[1]
return hash1? and hash1 == hash2
@isSameImage: (image1, image2) ->
hash1 = image1.name.split('@')[1]
hash2 = image2.name.split('@')[1]
return image1.name == image2.name or (hash1? and hash1 == hash2)
return image1.name == image2.name or Images.hasSameDigest(image1.name, image2.name)
isSameImage: @isSameImage

View File

@ -10,9 +10,11 @@ module.exports = class Networks
# TODO: parse supported config fields
format: (network) ->
[ appId, name ] = network.Name.split('_')
m = /^([0-9]+)_(.+)$/.match(network.Name)
appId = checkInt(m[1])
name = m[2]
return {
appId: checkInt(appId)
appId: appId
name: name
config: {}
}

View File

@ -6,7 +6,6 @@ constants = require '../lib/constants'
conversions = require '../lib/conversions'
Duration = require 'duration-js'
Images = require './images'
validRestartPolicies = [ 'no', 'always', 'on-failure', 'unless-stopped' ]
@ -112,6 +111,7 @@ module.exports = class Service
constructor: (serviceProperties, opts = {}) ->
{
@image
@imageName
@expose
@ports
@networkMode
@ -553,6 +553,7 @@ module.exports = class Service
isSameContainer: (otherService) =>
propertiesToCompare = [
'image'
'command'
'entrypoint'
'networkMode'
@ -590,8 +591,7 @@ module.exports = class Service
'extraHosts'
'ulimitsArray'
]
isEq = Images.isSameImage({ name: @image }, { name: otherService.image }) and
_.isEqual(_.pick(this, propertiesToCompare), _.pick(otherService, propertiesToCompare)) and
isEq = _.isEqual(_.pick(this, propertiesToCompare), _.pick(otherService, propertiesToCompare)) and
@hasSameNetworks(otherService) and
_.every arraysToCompare, (property) =>
_.isEmpty(_.xorWith(this[property], otherService[property], _.isEqual))

View File

@ -12,10 +12,12 @@ module.exports = class Volumes
constructor: ({ @docker, @logger }) ->
format: (volume) =>
[ appId, name ] = volume.Name.split('_')
m = /^([0-9]+)_(.+)$/.match(volume.Name)
appId = checkInt(m[1])
name = m[2]
return {
name: name
appId: checkInt(appId)
appId: appId
config: {
labels: _.omit(volume.Labels, _.keys(@defaultLabels(appId)))
driverOpts: volume.Options

View File

@ -49,13 +49,14 @@ module.exports = class DockerUtils extends DockerToolbelt
{
deltaRequestTimeout, deltaApplyTimeout, deltaRetryCount, deltaRetryInterval,
uuid, currentApiKey, deltaEndpoint, resinApiEndpoint,
deltaSource, startFromEmpty = false
deltaSource, deltaSourceId, startFromEmpty = false
} = fullDeltaOpts
retryCount = checkInt(deltaRetryCount)
retryInterval = checkInt(deltaRetryInterval)
requestTimeout = checkInt(deltaRequestTimeout)
applyTimeout = checkInt(deltaApplyTimeout)
deltaSource = 'resin/scratch' if startFromEmpty or !deltaSource?
deltaSourceId ?= deltaSource
# I'll leave this debug log here in case we ever wonder what delta source a device is using in production
console.log("Using delta source #{deltaSource}")
Promise.join @getRegistryAndName(imgDest), @getRegistryAndName(deltaSource), (dstInfo, srcInfo) ->
@ -92,14 +93,10 @@ module.exports = class DockerUtils extends DockerToolbelt
if deltaSource is 'resin/scratch'
deltaSrc = null
else
deltaSrc = deltaSource
deltaSrc = deltaSourceId
resumeOpts = { maxRetries: retryCount, retryInterval }
resolve(applyDelta(deltaSrc, deltaUrl, { requestTimeout, applyTimeout, resumeOpts }, onProgress))
.on 'error', reject
.then (id) =>
@getRepoAndTag(imgDest)
.then ({ repo, tag }) =>
@getImage(id).tag({ repo, tag, force: true })
.catch dockerDelta.OutOfSyncError, (err) =>
throw err if startFromEmpty
console.log('Falling back to delta-from-empty')
@ -109,13 +106,15 @@ module.exports = class DockerUtils extends DockerToolbelt
fetchImageWithProgress: (image, { uuid, currentApiKey }, onProgress) =>
@getRegistryAndName(image)
.then ({ registry, imageName, tagName }) =>
.then ({ registry }) =>
dockerOptions =
authconfig:
username: 'd_' + uuid,
password: currentApiKey,
serveraddress: registry
@dockerProgress.pull(image, onProgress, dockerOptions)
.then =>
@getImage(image).inspect().get('Id')
getImageEnv: (id) ->
@getImage(id).inspect()

View File

@ -0,0 +1,10 @@
// Adds a dockerImageId column to the image table to identify images downloaded with deltas
exports.up = function (knex, Promise) {
return knex.schema.table('image', (t) => {
t.string('dockerImageId')
})
}
exports.down = function(knex, Promise) {
return Promise.try(() => { throw new Error('Not implemented') })
}