mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-04-19 08:36:14 +00:00
refactor: Convert compose/images module to typescript
Change-type: minor Signed-off-by: Cameron Diver <cameron@balena.io>
This commit is contained in:
parent
f10ad00e01
commit
91b553dd32
@ -17,7 +17,7 @@ updateLock = require './lib/update-lock'
|
||||
|
||||
ServiceManager = require './compose/service-manager'
|
||||
{ Service } = require './compose/service'
|
||||
Images = require './compose/images'
|
||||
{ Images } = require './compose/images'
|
||||
{ NetworkManager } = require './compose/network-manager'
|
||||
{ Network } = require './compose/network'
|
||||
Volumes = require './compose/volumes'
|
||||
@ -469,11 +469,11 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
# Unless the update strategy requires an early kill (i.e. kill-then-download, delete-then-download), we only want
|
||||
# to kill a service once the images for the services it depends on have been downloaded, so as to minimize
|
||||
# downtime (but not block the killing too much, potentially causing a deadlock)
|
||||
_dependenciesMetForServiceKill: (target, targetApp, availableImages) =>
|
||||
_dependenciesMetForServiceKill: (target, targetApp, availableImages) ->
|
||||
if target.dependsOn?
|
||||
for dependency in target.dependsOn
|
||||
dependencyService = _.find(targetApp.services, serviceName: dependency)
|
||||
if !_.some(availableImages, (image) => image.dockerImageId == dependencyService.image or @images.isSameImage(image, { name: dependencyService.imageName }))
|
||||
if !_.some(availableImages, (image) -> image.dockerImageId == dependencyService.image or Images.isSameImage(image, { name: dependencyService.imageName }))
|
||||
return false
|
||||
return true
|
||||
|
||||
@ -563,8 +563,8 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
needsDownload = false
|
||||
# Don't attempt to fetch any images in local mode, they should already be there
|
||||
if !localMode
|
||||
needsDownload = !_.some availableImages, (image) =>
|
||||
image.dockerImageId == target?.config.image or @images.isSameImage(image, { name: target.imageName })
|
||||
needsDownload = !_.some availableImages, (image) ->
|
||||
image.dockerImageId == target?.config.image or Images.isSameImage(image, { name: target.imageName })
|
||||
|
||||
# This service needs an image download but it's currently downloading, so we wait
|
||||
if needsDownload and target?.imageId in downloading
|
||||
@ -671,7 +671,9 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return dbApp
|
||||
|
||||
createTargetService: (service, opts) ->
|
||||
@images.inspectByName(service.image)
|
||||
# The image class now returns a native promise, so wrap
|
||||
# this in a bluebird promise until we convert this to typescript
|
||||
Promise.resolve(@images.inspectByName(service.image))
|
||||
.catchReturn(NotFoundError, undefined)
|
||||
.then (imageInfo) ->
|
||||
serviceOpts = {
|
||||
@ -818,16 +820,16 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
availableAndUnused = _.filter availableWithoutIds, (image) ->
|
||||
!_.some currentImages.concat(targetImages), (imageInUse) -> _.isEqual(image, imageInUse)
|
||||
|
||||
imagesToDownload = _.filter targetImages, (targetImage) =>
|
||||
!_.some available, (availableImage) => @images.isSameImage(availableImage, targetImage)
|
||||
imagesToDownload = _.filter targetImages, (targetImage) ->
|
||||
!_.some available, (availableImage) -> Images.isSameImage(availableImage, targetImage)
|
||||
|
||||
# Images that are available but we don't have them in the DB with the exact metadata:
|
||||
imagesToSave = []
|
||||
if !localMode
|
||||
imagesToSave = _.filter targetImages, (targetImage) =>
|
||||
imagesToSave = _.filter targetImages, (targetImage) ->
|
||||
isActuallyAvailable = _.some(
|
||||
available, (availableImage) =>
|
||||
if @images.isSameImage(availableImage, targetImage)
|
||||
available, (availableImage) ->
|
||||
if Images.isSameImage(availableImage, targetImage)
|
||||
return true
|
||||
if availableImage.dockerImageId == targetImageDockerIds[targetImage.name]
|
||||
return true
|
||||
@ -840,9 +842,9 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
return @bestDeltaSource(image, available)
|
||||
proxyvisorImages = @proxyvisor.imagesInUse(current, target)
|
||||
|
||||
imagesToRemove = _.filter availableAndUnused, (image) =>
|
||||
imagesToRemove = _.filter availableAndUnused, (image) ->
|
||||
notUsedForDelta = !_.includes(deltaSources, image.name)
|
||||
notUsedByProxyvisor = !_.some proxyvisorImages, (proxyvisorImage) => @images.isSameImage(image, { name: proxyvisorImage })
|
||||
notUsedByProxyvisor = !_.some proxyvisorImages, (proxyvisorImage) -> Images.isSameImage(image, { name: proxyvisorImage })
|
||||
return notUsedForDelta and notUsedByProxyvisor
|
||||
return { imagesToSave, imagesToRemove }
|
||||
|
||||
|
2
src/application-manager.d.ts
vendored
2
src/application-manager.d.ts
vendored
@ -6,7 +6,7 @@ import { DeviceApplicationState } from './types/state';
|
||||
import { Logger } from './logger';
|
||||
import { EventTracker } from './event-tracker';
|
||||
|
||||
import Images = require('./compose/images');
|
||||
import Images from './compose/images';
|
||||
import ServiceManager = require('./compose/service-manager');
|
||||
import DB from './db';
|
||||
|
||||
|
@ -1,326 +0,0 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
EventEmitter = require 'events'
|
||||
logTypes = require '../lib/log-types'
|
||||
constants = require '../lib/constants'
|
||||
validation = require '../lib/validation'
|
||||
|
||||
{ DeltaStillProcessingError, NotFoundError } = require '../lib/errors'
|
||||
|
||||
# image = {
|
||||
# name: image registry/repo@digest or registry/repo:tag
|
||||
# appId
|
||||
# serviceId
|
||||
# serviceName
|
||||
# imageId (from balena API)
|
||||
# releaseId
|
||||
# dependent
|
||||
# dockerImageId
|
||||
# status Downloading, Downloaded, Deleting
|
||||
# downloadProgress
|
||||
# }
|
||||
|
||||
hasDigest = (name) ->
|
||||
name?.split?('@')?[1]?
|
||||
|
||||
module.exports = class Images extends EventEmitter
|
||||
constructor: ({ @docker, @logger, @db }) ->
|
||||
@imageCleanupFailures = {}
|
||||
# A store of volatile state for images (e.g. download progress), indexed by imageId
|
||||
@volatileState = {}
|
||||
|
||||
reportChange: (imageId, status) ->
|
||||
if status?
|
||||
@volatileState[imageId] ?= { imageId }
|
||||
_.merge(@volatileState[imageId], status)
|
||||
@emit('change')
|
||||
else if imageId? and @volatileState[imageId]?
|
||||
delete @volatileState[imageId]
|
||||
@emit('change')
|
||||
|
||||
triggerFetch: (image, opts, onFinish = _.noop) =>
|
||||
onProgress = (progress) =>
|
||||
# Only report the percentage if we haven't finished fetching
|
||||
if @volatileState[image.imageId]?
|
||||
@reportChange(image.imageId, { downloadProgress: progress.percentage })
|
||||
|
||||
@normalise(image.name)
|
||||
.then (imageName) =>
|
||||
image = _.clone(image)
|
||||
image.name = imageName
|
||||
@markAsSupervised(image)
|
||||
.then =>
|
||||
@inspectByName(imageName)
|
||||
.then (img) =>
|
||||
@db.models('image').update({ dockerImageId: img.Id }).where(image)
|
||||
.then ->
|
||||
onFinish(true)
|
||||
return null
|
||||
.catch =>
|
||||
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 }))
|
||||
Promise.try =>
|
||||
if opts.delta and opts.deltaSource?
|
||||
@logger.logSystemEvent(logTypes.downloadImageDelta, { image })
|
||||
@inspectByName(opts.deltaSource)
|
||||
.then (srcImage) =>
|
||||
opts.deltaSourceId = srcImage.Id
|
||||
@docker.fetchDeltaWithProgress(imageName, opts, onProgress)
|
||||
.tap (id) =>
|
||||
if !hasDigest(imageName)
|
||||
@docker.getRepoAndTag(imageName)
|
||||
.then ({ repo, tag }) =>
|
||||
@docker.getImage(id).tag({ repo, tag })
|
||||
else
|
||||
@logger.logSystemEvent(logTypes.downloadImage, { image })
|
||||
@docker.fetchImageWithProgress(imageName, opts, onProgress)
|
||||
.then (id) =>
|
||||
@db.models('image').update({ dockerImageId: id }).where(image)
|
||||
.then =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageSuccess, { image })
|
||||
return true
|
||||
.catch DeltaStillProcessingError, =>
|
||||
# If this is a delta image pull, and the delta still hasn't finished generating,
|
||||
# don't show a failure message, and instead just inform the user that it's remotely
|
||||
# processing
|
||||
@logger.logSystemEvent(logTypes.deltaStillProcessingError)
|
||||
return false
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.downloadImageError, { image, error: err })
|
||||
return false
|
||||
.then (success) =>
|
||||
@reportChange(image.imageId)
|
||||
onFinish(success)
|
||||
return null
|
||||
return null
|
||||
|
||||
format: (image) ->
|
||||
image.serviceId ?= null
|
||||
image.serviceName ?= null
|
||||
image.imageId ?= null
|
||||
image.releaseId ?= null
|
||||
image.dependent ?= 0
|
||||
image.dockerImageId ?= null
|
||||
return _.omit(image, 'id')
|
||||
|
||||
markAsSupervised: (image) =>
|
||||
image = @format(image)
|
||||
@db.upsertModel('image', image, image)
|
||||
|
||||
update: (image) =>
|
||||
image = @format(image)
|
||||
@db.models('image').update(image).where(name: image.name)
|
||||
|
||||
save: (image) =>
|
||||
@inspectByName(image.name)
|
||||
.then (img) =>
|
||||
image = _.clone(image)
|
||||
image.dockerImageId = img.Id
|
||||
@markAsSupervised(image)
|
||||
|
||||
_removeImageIfNotNeeded: (image) =>
|
||||
# We first fetch the image from the DB to ensure it exists,
|
||||
# and get the dockerImageId and any other missing field
|
||||
@db.models('image').select().where(image)
|
||||
.then (images) =>
|
||||
if images.length == 0
|
||||
return false
|
||||
img = images[0]
|
||||
Promise.try =>
|
||||
if !img.dockerImageId?
|
||||
# Legacy image from before we started using dockerImageId, so we try to remove it by name
|
||||
@docker.getImage(img.name).remove(force: true)
|
||||
.return(true)
|
||||
else
|
||||
@db.models('image').where(dockerImageId: img.dockerImageId).select()
|
||||
.then (imagesFromDB) =>
|
||||
if imagesFromDB.length == 1 and _.isEqual(@format(imagesFromDB[0]), @format(img))
|
||||
@reportChange(image.imageId, _.merge(_.clone(image), { status: 'Deleting' }))
|
||||
@logger.logSystemEvent(logTypes.deleteImage, { image })
|
||||
@docker.getImage(img.dockerImageId).remove(force: true)
|
||||
.return(true)
|
||||
else if !hasDigest(img.name)
|
||||
# Image has a regular tag, so we might have to remove unnecessary tags
|
||||
@docker.getImage(img.dockerImageId).inspect()
|
||||
.then (dockerImg) =>
|
||||
differentTags = _.reject(imagesFromDB, name: img.name)
|
||||
if dockerImg.RepoTags.length > 1 and
|
||||
_.includes(dockerImg.RepoTags, img.name) and
|
||||
_.some(dockerImg.RepoTags, (tag) -> _.some(differentTags, name: tag))
|
||||
@docker.getImage(img.name).remove(noprune: true)
|
||||
.return(false)
|
||||
else
|
||||
return false
|
||||
.catchReturn(NotFoundError, false)
|
||||
.tap =>
|
||||
@db.models('image').del().where(id: img.id)
|
||||
.then (removed) =>
|
||||
if removed
|
||||
@logger.logSystemEvent(logTypes.deleteImageSuccess, { image })
|
||||
.finally =>
|
||||
@reportChange(image.imageId)
|
||||
|
||||
remove: (image) =>
|
||||
@_removeImageIfNotNeeded(image)
|
||||
.tapCatch (err) =>
|
||||
@logger.logSystemEvent(logTypes.deleteImageError, { image, error: err })
|
||||
|
||||
getByDockerId: (id) =>
|
||||
@db.models('image').where(dockerImageId: id).first()
|
||||
|
||||
removeByDockerId: (id) =>
|
||||
@getByDockerId(id)
|
||||
.then(@remove)
|
||||
|
||||
getNormalisedTags: (image) ->
|
||||
Promise.map(image.RepoTags ? [], @normalise)
|
||||
|
||||
_withImagesFromDockerAndDB: (callback) =>
|
||||
Promise.join(
|
||||
@docker.listImages(digests: true)
|
||||
.map (image) =>
|
||||
image.NormalisedRepoTags = @getNormalisedTags(image)
|
||||
Promise.props(image)
|
||||
@db.models('image').select()
|
||||
callback
|
||||
)
|
||||
|
||||
_matchesTagOrDigest: (image, dockerImage) ->
|
||||
return _.includes(dockerImage.NormalisedRepoTags, image.name) or
|
||||
_.some(dockerImage.RepoDigests, (digest) -> Images.hasSameDigest(image.name, digest))
|
||||
|
||||
_isAvailableInDocker: (image, dockerImages) =>
|
||||
_.some dockerImages, (dockerImage) =>
|
||||
@_matchesTagOrDigest(image, dockerImage) or image.dockerImageId == dockerImage.Id
|
||||
|
||||
# Gets all images that are supervised, in an object containing name, appId, serviceId, serviceName, imageId, dependent.
|
||||
getAvailable: (localMode) =>
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
_.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages))
|
||||
.then (images) =>
|
||||
if localMode
|
||||
# Get all images present on the local daemon which are tagged as local images
|
||||
return @_getLocalModeImages().then (localImages) ->
|
||||
images.concat(localImages)
|
||||
return images
|
||||
|
||||
|
||||
getDownloadingImageIds: =>
|
||||
Promise.try =>
|
||||
return _.map(_.keys(_.pickBy(@volatileState, status: 'Downloading')), validation.checkInt)
|
||||
|
||||
cleanupDatabase: =>
|
||||
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
|
||||
Promise.map supervisedImages, (image) =>
|
||||
# If the supervisor was interrupted between fetching an image and storing its id,
|
||||
# some entries in the db might need to have the dockerImageId populated
|
||||
if !image.dockerImageId?
|
||||
id = _.find(dockerImages, (dockerImage) => @_matchesTagOrDigest(image, dockerImage))?.Id
|
||||
if id?
|
||||
@db.models('image').update(dockerImageId: id).where(image)
|
||||
.then ->
|
||||
image.dockerImageId = id
|
||||
.then =>
|
||||
_.filter(supervisedImages, (image) => !@_isAvailableInDocker(image, dockerImages))
|
||||
.then (imagesToRemove) =>
|
||||
ids = _.map(imagesToRemove, 'id')
|
||||
@db.models('image').del().whereIn('id', ids)
|
||||
|
||||
getStatus: (localMode) =>
|
||||
@getAvailable(localMode)
|
||||
.map (image) ->
|
||||
image.status = 'Downloaded'
|
||||
image.downloadProgress = null
|
||||
return image
|
||||
.then (images) =>
|
||||
status = _.clone(@volatileState)
|
||||
for image in images
|
||||
status[image.imageId] ?= image
|
||||
return _.values(status)
|
||||
|
||||
_getImagesForCleanup: =>
|
||||
images = []
|
||||
Promise.join(
|
||||
@docker.getRegistryAndName(constants.supervisorImage)
|
||||
@docker.getImage(constants.supervisorImage).inspect()
|
||||
@db.models('image').select('dockerImageId')
|
||||
.map((image) -> image.dockerImageId)
|
||||
(supervisorImageInfo, supervisorImage, usedImageIds) =>
|
||||
isSupervisorRepoTag = ({ imageName, tagName }) ->
|
||||
supervisorRepos = [ supervisorImageInfo.imageName ]
|
||||
if _.startsWith(supervisorImageInfo.imageName, 'balena/') # We're on a new balena/ARCH-supervisor image
|
||||
supervisorRepos.push(supervisorImageInfo.imageName.replace(/^balena/, 'resin'))
|
||||
return _.some(supervisorRepos, (repo) -> imageName == repo) and tagName != supervisorImageInfo.tagName
|
||||
isDangling = (image) ->
|
||||
# Looks like dangling images show up with these weird RepoTags and RepoDigests sometimes
|
||||
(_.isEmpty(image.RepoTags) or _.isEqual(image.RepoTags, [ '<none>:<none>' ])) and
|
||||
(_.isEmpty(image.RepoDigests) or _.isEqual(image.RepoDigests, [ '<none>@<none>' ]))
|
||||
@docker.listImages(digests: true)
|
||||
.map (image) =>
|
||||
# Cleanup should remove truly dangling images (i.e. dangling and with no digests)
|
||||
if isDangling(image) and not (image.Id in usedImageIds)
|
||||
images.push(image.Id)
|
||||
else if !_.isEmpty(image.RepoTags) and image.Id != supervisorImage.Id
|
||||
# We also remove images from the supervisor repository with a different tag
|
||||
Promise.map image.RepoTags, (repoTag) =>
|
||||
@docker.getRegistryAndName(repoTag)
|
||||
.then (imageNameComponents) ->
|
||||
if isSupervisorRepoTag(imageNameComponents)
|
||||
images.push(image.Id)
|
||||
)
|
||||
.then =>
|
||||
toCleanup = _.filter _.uniq(images), (image) =>
|
||||
!@imageCleanupFailures[image]? or Date.now() - @imageCleanupFailures[image] > constants.imageCleanupErrorIgnoreTimeout
|
||||
return toCleanup
|
||||
inspectByName: (imageName) =>
|
||||
@docker.getImage(imageName).inspect()
|
||||
.catch NotFoundError, (err) =>
|
||||
digest = imageName.split('@')[1]
|
||||
Promise.try =>
|
||||
if digest?
|
||||
@db.models('image').where('name', 'like', "%@#{digest}").select()
|
||||
else
|
||||
@db.models('image').where(name: imageName).select()
|
||||
.then (imagesFromDB) =>
|
||||
for image in imagesFromDB
|
||||
if image.dockerImageId?
|
||||
return @docker.getImage(image.dockerImageId).inspect()
|
||||
throw err
|
||||
|
||||
|
||||
normalise: (imageName) =>
|
||||
@docker.normaliseImageName(imageName)
|
||||
|
||||
isCleanupNeeded: =>
|
||||
@_getImagesForCleanup()
|
||||
.then (imagesForCleanup) ->
|
||||
return !_.isEmpty(imagesForCleanup)
|
||||
|
||||
# Delete dangling images and old supervisor images
|
||||
cleanup: =>
|
||||
@_getImagesForCleanup()
|
||||
.map (image) =>
|
||||
console.log("Cleaning up #{image}")
|
||||
@docker.getImage(image).remove(force: true)
|
||||
.then =>
|
||||
delete @imageCleanupFailures[image]
|
||||
.catch (err) =>
|
||||
@logger.logSystemMessage("Error cleaning up #{image}: #{err.message} - will ignore for 1 hour", { error: err }, 'Image cleanup error')
|
||||
@imageCleanupFailures[image] = Date.now()
|
||||
|
||||
@hasSameDigest: (name1, name2) ->
|
||||
hash1 = name1?.split('@')[1]
|
||||
hash2 = name2?.split('@')[1]
|
||||
return hash1? and hash1 == hash2
|
||||
|
||||
@isSameImage: (image1, image2) ->
|
||||
return image1.name == image2.name or Images.hasSameDigest(image1.name, image2.name)
|
||||
|
||||
isSameImage: @isSameImage
|
||||
|
||||
_getLocalModeImages: =>
|
||||
Promise.join(
|
||||
@docker.listImages(filters: label: [ 'io.resin.local.image=1' ])
|
||||
@docker.listImages(filters: label: [ 'io.balena.local.image=1' ])
|
||||
(legacyImages, currentImages) ->
|
||||
_.unionBy(legacyImages, currentImages, 'Id')
|
||||
)
|
7
src/compose/images.d.ts
vendored
7
src/compose/images.d.ts
vendored
@ -1,7 +0,0 @@
|
||||
import Image from '../types/image';
|
||||
|
||||
declare class Images {
|
||||
public getStatus(): Image[];
|
||||
}
|
||||
|
||||
export = Images;
|
652
src/compose/images.ts
Normal file
652
src/compose/images.ts
Normal file
@ -0,0 +1,652 @@
|
||||
import * as Bluebird from 'bluebird';
|
||||
import * as Docker from 'dockerode';
|
||||
import { EventEmitter } from 'events';
|
||||
import * as _ from 'lodash';
|
||||
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||
|
||||
import { SchemaReturn } from '../config/schema-type';
|
||||
import Database from '../db';
|
||||
import * as constants from '../lib/constants';
|
||||
import DockerUtils = require('../lib/docker-utils');
|
||||
import { DeltaStillProcessingError, NotFoundError } from '../lib/errors';
|
||||
import * as LogTypes from '../lib/log-types';
|
||||
import * as validation from '../lib/validation';
|
||||
import Logger from '../logger';
|
||||
|
||||
interface ImageEvents {
|
||||
change: void;
|
||||
}
|
||||
|
||||
type ImageEventEmitter = StrictEventEmitter<EventEmitter, ImageEvents>;
|
||||
|
||||
interface ImageConstructOpts {
|
||||
docker: DockerUtils;
|
||||
logger: Logger;
|
||||
db: Database;
|
||||
}
|
||||
|
||||
interface FetchProgressEvent {
|
||||
percentage: number;
|
||||
}
|
||||
|
||||
// TODO: This is copied from src/lib/docker-utils.d.ts but because of the
|
||||
// export mechanism used, we can't export it. Once we convert docker-utils
|
||||
// to typescript, remove this
|
||||
interface DeltaFetchOptions {
|
||||
deltaRequestTimeout: number;
|
||||
deltaApplyTimeout: number;
|
||||
deltaRetryCount: number;
|
||||
deltaRetryInterval: number;
|
||||
uuid: string;
|
||||
currentApiKey: string;
|
||||
deltaEndpoint: string;
|
||||
apiEndpoint: string;
|
||||
deltaSource: string;
|
||||
deltaSourceId: string;
|
||||
deltaVersion: string;
|
||||
}
|
||||
|
||||
type FetchOptions = SchemaReturn<'fetchOptions'> & { deltaSource?: string };
|
||||
|
||||
export interface Image {
|
||||
id: number;
|
||||
// image registry/repo@digest or registry/repo:tag
|
||||
name: string;
|
||||
appId: number;
|
||||
serviceId: number;
|
||||
serviceName: string;
|
||||
// Id from balena api
|
||||
imageId: number;
|
||||
releaseId: number;
|
||||
dependent: number;
|
||||
dockerImageId: string;
|
||||
status: 'Downloading' | 'Downloaded' | 'Deleting';
|
||||
downloadProgress: Nullable<number>;
|
||||
}
|
||||
|
||||
// TODO: This is necessary for the format() method, but I'm not sure
|
||||
// why, and it seems like a bad idea as it is. Fix the need for this.
|
||||
type MaybeImage = { [key in keyof Image]: Image[key] | null };
|
||||
|
||||
// TODO: Remove the need for this type...
|
||||
type NormalisedDockerImage = Docker.ImageInfo & {
|
||||
NormalisedRepoTags: string[];
|
||||
};
|
||||
|
||||
export class Images extends (EventEmitter as {
|
||||
new (): ImageEventEmitter;
|
||||
}) {
|
||||
private docker: DockerUtils;
|
||||
private logger: Logger;
|
||||
private db: Database;
|
||||
|
||||
private imageCleanupFailures: Dictionary<number> = {};
|
||||
// A store of volatile state for images (e.g. download progress), indexed by imageId
|
||||
private volatileState: { [imageId: number]: Image } = {};
|
||||
|
||||
public constructor(opts: ImageConstructOpts) {
|
||||
super();
|
||||
|
||||
this.docker = opts.docker;
|
||||
this.logger = opts.logger;
|
||||
this.db = opts.db;
|
||||
}
|
||||
|
||||
public async triggerFetch(
|
||||
image: Image,
|
||||
opts: FetchOptions,
|
||||
onFinish = _.noop,
|
||||
): Promise<null> {
|
||||
const onProgress = (progress: FetchProgressEvent) => {
|
||||
// Only report the percentage if we haven't finished fetching
|
||||
if (this.volatileState[image.imageId] != null) {
|
||||
this.reportChange(image.imageId, {
|
||||
downloadProgress: progress.percentage,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let success: boolean;
|
||||
try {
|
||||
const imageName = await this.normalise(image.name);
|
||||
image = _.clone(image);
|
||||
image.name = imageName;
|
||||
|
||||
await this.markAsSupervised(image);
|
||||
|
||||
const img = await this.inspectByName(image.name);
|
||||
await this.db
|
||||
.models('image')
|
||||
.update({ dockerImageId: img.Id })
|
||||
.where(image);
|
||||
|
||||
onFinish(true);
|
||||
return null;
|
||||
} catch (e) {
|
||||
if (!NotFoundError(e)) {
|
||||
throw e;
|
||||
}
|
||||
this.reportChange(
|
||||
image.imageId,
|
||||
_.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 }),
|
||||
);
|
||||
|
||||
try {
|
||||
let id;
|
||||
if (opts.delta && opts.deltaSource != null) {
|
||||
id = await this.fetchDelta(image, opts, onProgress);
|
||||
} else {
|
||||
id = await this.fetchImage(image, opts, onProgress);
|
||||
}
|
||||
|
||||
await this.db
|
||||
.models('image')
|
||||
.update({ dockerImageId: id })
|
||||
.where(image);
|
||||
|
||||
this.logger.logSystemEvent(LogTypes.downloadImageSuccess, { image });
|
||||
success = true;
|
||||
} catch (err) {
|
||||
if (err instanceof DeltaStillProcessingError) {
|
||||
// If this is a delta image pull, and the delta still hasn't finished generating,
|
||||
// don't show a failure message, and instead just inform the user that it's remotely
|
||||
// processing
|
||||
this.logger.logSystemEvent(LogTypes.deltaStillProcessingError, {});
|
||||
} else {
|
||||
this.logger.logSystemEvent(LogTypes.downloadImageError, {
|
||||
image,
|
||||
error: err,
|
||||
});
|
||||
}
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
this.reportChange(image.imageId);
|
||||
onFinish(success);
|
||||
return null;
|
||||
}
|
||||
|
||||
public async remove(image: Image): Promise<void> {
|
||||
try {
|
||||
await this.removeImageIfNotNeeded(image);
|
||||
} catch (e) {
|
||||
this.logger.logSystemEvent(LogTypes.deleteImageError, {
|
||||
image,
|
||||
error: e,
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async getByDockerId(id: string): Promise<Image> {
|
||||
return await this.db
|
||||
.models('image')
|
||||
.where({ dockerImageId: id })
|
||||
.first();
|
||||
}
|
||||
|
||||
public async removeByDockerId(id: string): Promise<void> {
|
||||
const image = await this.getByDockerId(id);
|
||||
await this.remove(image);
|
||||
}
|
||||
|
||||
private async getNormalisedTags(image: Docker.ImageInfo): Promise<string[]> {
|
||||
return await Bluebird.map(
|
||||
image.RepoTags != null ? image.RepoTags : [],
|
||||
this.normalise.bind(this),
|
||||
);
|
||||
}
|
||||
|
||||
private withImagesFromDockerAndDB<T>(
|
||||
cb: (dockerImages: NormalisedDockerImage[], composeImages: Image[]) => T,
|
||||
) {
|
||||
return Bluebird.join(
|
||||
Bluebird.resolve(this.docker.listImages({ digests: true })).map(image => {
|
||||
const newImage: Dictionary<unknown> = _.clone(image);
|
||||
newImage.NormalisedRepoTags = this.getNormalisedTags(image);
|
||||
return Bluebird.props(newImage);
|
||||
}),
|
||||
this.db.models('image').select(),
|
||||
cb,
|
||||
);
|
||||
}
|
||||
|
||||
private matchesTagOrDigest(
|
||||
image: Image,
|
||||
dockerImage: NormalisedDockerImage,
|
||||
): boolean {
|
||||
return (
|
||||
_.includes(dockerImage.NormalisedRepoTags, image.name) ||
|
||||
_.some(dockerImage.RepoDigests, digest =>
|
||||
Images.hasSameDigest(image.name, digest),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private isAvailableInDocker(
|
||||
image: Image,
|
||||
dockerImages: NormalisedDockerImage[],
|
||||
): boolean {
|
||||
return _.some(
|
||||
dockerImages,
|
||||
dockerImage =>
|
||||
this.matchesTagOrDigest(image, dockerImage) ||
|
||||
image.dockerImageId === dockerImage.Id,
|
||||
);
|
||||
}
|
||||
|
||||
public async getAvailable(_localMode: boolean): Promise<Image[]> {
|
||||
const images = await this.withImagesFromDockerAndDB(
|
||||
(dockerImages, supervisedImages) =>
|
||||
_.filter(supervisedImages, image =>
|
||||
this.isAvailableInDocker(image, dockerImages),
|
||||
),
|
||||
);
|
||||
|
||||
// if (localMode) {
|
||||
// // Get all images present on the local daemon which are tagged as local images
|
||||
// return images.concat(await this.getLocalModeImages());
|
||||
// }
|
||||
return images;
|
||||
}
|
||||
|
||||
// TODO: Why does this need a Bluebird.try?
|
||||
public getDownloadingImageIds() {
|
||||
return Bluebird.try(() =>
|
||||
_(this.volatileState)
|
||||
.pickBy({ status: 'Downloading' })
|
||||
.keys()
|
||||
.map(validation.checkInt)
|
||||
.value(),
|
||||
);
|
||||
}
|
||||
|
||||
public async cleanupDatabase(): Promise<void> {
|
||||
const imagesToRemove = await this.withImagesFromDockerAndDB(
|
||||
async (dockerImages, supervisedImages) => {
|
||||
for (const supervisedImage of supervisedImages) {
|
||||
// If the supervisor was interrupted between fetching an image and storing its id,
|
||||
// some entries in the db might need to have the dockerImageId populated
|
||||
if (supervisedImage.dockerImageId == null) {
|
||||
const id = _.get(
|
||||
_.find(dockerImages, dockerImage =>
|
||||
this.matchesTagOrDigest(supervisedImage, dockerImage),
|
||||
),
|
||||
'Id',
|
||||
);
|
||||
|
||||
if (id != null) {
|
||||
await this.db
|
||||
.models('image')
|
||||
.update({ dockerImageId: id })
|
||||
.where(supervisedImage);
|
||||
supervisedImage.dockerImageId = id;
|
||||
}
|
||||
}
|
||||
}
|
||||
return _.filter(
|
||||
supervisedImages,
|
||||
image => !this.isAvailableInDocker(image, dockerImages),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
const ids = _.map(imagesToRemove, 'id');
|
||||
await this.db
|
||||
.models('image')
|
||||
.del()
|
||||
.whereIn('id', ids);
|
||||
}
|
||||
|
||||
public async getStatus(localMode: boolean) {
|
||||
const images = await this.getAvailable(localMode);
|
||||
for (const image of images) {
|
||||
image.status = 'Downloaded';
|
||||
image.downloadProgress = null;
|
||||
}
|
||||
const status = _.clone(this.volatileState);
|
||||
for (const image of images) {
|
||||
if (status[image.imageId] == null) {
|
||||
status[image.imageId] = image;
|
||||
}
|
||||
}
|
||||
return _.values(status);
|
||||
}
|
||||
|
||||
public async update(image: Image): Promise<void> {
|
||||
image = this.format(image);
|
||||
await this.db
|
||||
.models('image')
|
||||
.update(image)
|
||||
.where({ name: image.name });
|
||||
}
|
||||
|
||||
public async save(image: Image): Promise<void> {
|
||||
const img = await this.inspectByName(image.name);
|
||||
image = _.clone(image);
|
||||
image.dockerImageId = img.Id;
|
||||
await this.markAsSupervised(image);
|
||||
}
|
||||
|
||||
private async getImagesForCleanup(): Promise<string[]> {
|
||||
const images = [];
|
||||
|
||||
const [
|
||||
supervisorImageInfo,
|
||||
supervisorImage,
|
||||
usedImageIds,
|
||||
] = await Promise.all([
|
||||
this.docker.getRegistryAndName(constants.supervisorImage),
|
||||
this.docker.getImage(constants.supervisorImage).inspect(),
|
||||
this.db
|
||||
.models('image')
|
||||
.select('dockerImageId')
|
||||
.map((img: Image) => img.dockerImageId),
|
||||
]);
|
||||
|
||||
const supervisorRepos = [supervisorImageInfo.imageName];
|
||||
// If we're on the new balena/ARCH-supervisor image
|
||||
if (_.startsWith(supervisorImageInfo.imageName, 'balena/')) {
|
||||
supervisorRepos.push(
|
||||
supervisorImageInfo.imageName.replace(/^balena/, 'resin'),
|
||||
);
|
||||
}
|
||||
|
||||
const isSupervisorRepoTag = ({
|
||||
imageName,
|
||||
tagName,
|
||||
}: {
|
||||
imageName: string;
|
||||
tagName: string;
|
||||
}) => {
|
||||
return (
|
||||
_.some(supervisorRepos, repo => imageName === repo) &&
|
||||
tagName !== supervisorImageInfo.tagName
|
||||
);
|
||||
};
|
||||
|
||||
const dockerImages = await this.docker.listImages({ digests: true });
|
||||
for (const image of dockerImages) {
|
||||
// Cleanup should remove truly dangling images (i.e dangling and with no digests)
|
||||
if (Images.isDangling(image) && !_.includes(usedImageIds, image.Id)) {
|
||||
images.push(image.Id);
|
||||
} else if (
|
||||
!_.isEmpty(image.RepoTags) &&
|
||||
image.Id !== supervisorImage.Id
|
||||
) {
|
||||
// We also remove images from the supervisor repository with a different tag
|
||||
for (const tag of image.RepoTags) {
|
||||
const imageNameComponents = await this.docker.getRegistryAndName(tag);
|
||||
if (isSupervisorRepoTag(imageNameComponents)) {
|
||||
images.push(image.Id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const toCleanup = _(images)
|
||||
.uniq()
|
||||
.filter(
|
||||
image =>
|
||||
this.imageCleanupFailures[image] == null ||
|
||||
Date.now() - this.imageCleanupFailures[image] >
|
||||
constants.imageCleanupErrorIgnoreTimeout,
|
||||
)
|
||||
.value();
|
||||
return toCleanup;
|
||||
}
|
||||
|
||||
public async inspectByName(
|
||||
imageName: string,
|
||||
): Promise<Docker.ImageInspectInfo> {
|
||||
try {
|
||||
return await this.docker.getImage(imageName).inspect();
|
||||
} catch (e) {
|
||||
if (NotFoundError(e)) {
|
||||
const digest = imageName.split('@')[1];
|
||||
let imagesFromDb: Image[];
|
||||
if (digest != null) {
|
||||
imagesFromDb = await this.db
|
||||
.models('image')
|
||||
.where('name', 'like', `%@${digest}`);
|
||||
} else {
|
||||
imagesFromDb = await this.db
|
||||
.models('image')
|
||||
.where({ name: imageName })
|
||||
.select();
|
||||
}
|
||||
|
||||
for (const image of imagesFromDb) {
|
||||
if (image.dockerImageId != null) {
|
||||
return await this.docker.getImage(image.dockerImageId).inspect();
|
||||
}
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async isCleanupNeeded() {
|
||||
return !_.isEmpty(await this.getImagesForCleanup());
|
||||
}
|
||||
|
||||
public async cleanup() {
|
||||
const images = await this.getImagesForCleanup();
|
||||
for (const image of images) {
|
||||
console.log(`Cleaning up ${image}`);
|
||||
try {
|
||||
await this.docker.getImage(image).remove({ force: true });
|
||||
delete this.imageCleanupFailures[image];
|
||||
} catch (e) {
|
||||
this.logger.logSystemMessage(
|
||||
`Error cleaning up ${image}: ${e.message} - will ignore for 1 hour`,
|
||||
{ error: e },
|
||||
'Image cleanup error',
|
||||
);
|
||||
this.imageCleanupFailures[image] = Date.now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static isSameImage(image1: Image, image2: Image): boolean {
|
||||
return (
|
||||
image1.name === image2.name ||
|
||||
Images.hasSameDigest(image1.name, image2.name)
|
||||
);
|
||||
}
|
||||
|
||||
// private async getLocalModeImages() {
|
||||
// const [legacy, current] = await Promise.all([
|
||||
// this.docker.listImages({
|
||||
// filters: { label: ['io.resin.local.image=1'] },
|
||||
// }),
|
||||
// this.docker.listImages({
|
||||
// filters: { label: ['io.balena.local.image=1'] },
|
||||
// }),
|
||||
// ]);
|
||||
|
||||
// const dockerImages = _.unionBy(legacy, current, 'Id');
|
||||
// }
|
||||
|
||||
private normalise(imageName: string): Bluebird<string> {
|
||||
return this.docker.normaliseImageName(imageName);
|
||||
}
|
||||
|
||||
private static isDangling(image: Docker.ImageInfo): boolean {
|
||||
return (
|
||||
(_.isEmpty(image.RepoTags) ||
|
||||
_.isEqual(image.RepoTags, ['<none>:<none>'])) &&
|
||||
(_.isEmpty(image.RepoDigests) ||
|
||||
_.isEqual(image.RepoDigests, ['<none>@<none>']))
|
||||
);
|
||||
}
|
||||
|
||||
private static hasSameDigest(
|
||||
name1: Nullable<string>,
|
||||
name2: Nullable<string>,
|
||||
): boolean {
|
||||
const hash1 = name1 != null ? name1.split('@')[1] : null;
|
||||
const hash2 = name2 != null ? name2.split('@')[1] : null;
|
||||
return hash1 != null && hash1 === hash2;
|
||||
}
|
||||
|
||||
private async removeImageIfNotNeeded(image: Image): Promise<void> {
|
||||
let removed: boolean;
|
||||
|
||||
// We first fetch the image from the DB to ensure it exists,
|
||||
// and get the dockerImageId and any other missing fields
|
||||
const images = await this.db
|
||||
.models('image')
|
||||
.select()
|
||||
.where(image);
|
||||
|
||||
if (images.length === 0) {
|
||||
removed = false;
|
||||
}
|
||||
|
||||
const img = images[0];
|
||||
try {
|
||||
if (img.dockerImageId == null) {
|
||||
// Legacy image from before we started using dockerImageId, so we try to remove it
|
||||
// by name
|
||||
await this.docker.getImage(img.name).remove({ force: true });
|
||||
removed = true;
|
||||
} else {
|
||||
const imagesFromDb = await this.db
|
||||
.models('image')
|
||||
.where({ dockerImageId: img.dockerImageId })
|
||||
.select();
|
||||
if (
|
||||
imagesFromDb.length === 1 &&
|
||||
_.isEqual(this.format(imagesFromDb[0]), this.format(img))
|
||||
) {
|
||||
this.reportChange(
|
||||
image.imageId,
|
||||
_.merge(_.clone(image), { status: 'Deleting' }),
|
||||
);
|
||||
this.logger.logSystemEvent(LogTypes.deleteImage, { image });
|
||||
this.docker.getImage(img.dockerImageId).remove({ force: true });
|
||||
removed = true;
|
||||
} else if (!Images.hasDigest(img.name)) {
|
||||
// Image has a regular tag, so we might have to remove unnecessary tags
|
||||
const dockerImage = await this.docker
|
||||
.getImage(img.dockerImageId)
|
||||
.inspect();
|
||||
const differentTags = _.reject(imagesFromDb, { name: img.name });
|
||||
|
||||
if (
|
||||
dockerImage.RepoTags.length > 1 &&
|
||||
_.includes(dockerImage.RepoTags, img.name) &&
|
||||
_.some(dockerImage.RepoTags, t =>
|
||||
_.some(differentTags, { name: t }),
|
||||
)
|
||||
) {
|
||||
await this.docker.getImage(img.name).remove({ noprune: true });
|
||||
}
|
||||
removed = false;
|
||||
} else {
|
||||
removed = false;
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
if (NotFoundError(e)) {
|
||||
removed = false;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
this.reportChange(image.imageId);
|
||||
}
|
||||
|
||||
await this.db
|
||||
.models('image')
|
||||
.del()
|
||||
.where({ id: img.id });
|
||||
|
||||
if (removed) {
|
||||
this.logger.logSystemEvent(LogTypes.deleteImageSuccess, { image });
|
||||
}
|
||||
}
|
||||
|
||||
private async markAsSupervised(image: Image): Promise<void> {
|
||||
image = this.format(image);
|
||||
await this.db.upsertModel('image', image, image);
|
||||
}
|
||||
|
||||
private format(image: MaybeImage): Image {
|
||||
return _(image)
|
||||
.defaults({
|
||||
serviceId: null,
|
||||
serviceName: null,
|
||||
imageId: null,
|
||||
releaseId: null,
|
||||
dependent: 0,
|
||||
dockerImageId: null,
|
||||
})
|
||||
.omit('id')
|
||||
.value() as Image;
|
||||
}
|
||||
|
||||
private async fetchDelta(
|
||||
image: Image,
|
||||
opts: FetchOptions,
|
||||
onProgress: (evt: FetchProgressEvent) => void,
|
||||
): Promise<string> {
|
||||
this.logger.logSystemEvent(LogTypes.downloadImageDelta, { image });
|
||||
|
||||
const deltaOpts = (opts as unknown) as DeltaFetchOptions;
|
||||
const srcImage = await this.inspectByName(deltaOpts.deltaSource);
|
||||
|
||||
deltaOpts.deltaSourceId = srcImage.Id;
|
||||
const id = await this.docker.fetchDeltaWithProgress(
|
||||
image.name,
|
||||
deltaOpts,
|
||||
onProgress,
|
||||
);
|
||||
|
||||
if (!Images.hasDigest(image.name)) {
|
||||
const { repo, tag } = await this.docker.getRepoAndTag(image.name);
|
||||
await this.docker.getImage(id).tag({ repo, tag });
|
||||
}
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
private fetchImage(
|
||||
image: Image,
|
||||
opts: FetchOptions,
|
||||
onProgress: (evt: FetchProgressEvent) => void,
|
||||
): Bluebird<string> {
|
||||
this.logger.logSystemEvent(LogTypes.downloadImage, { image });
|
||||
return this.docker.fetchImageWithProgress(image.name, opts, onProgress);
|
||||
}
|
||||
|
||||
// TODO: find out if imageId can actually be null
|
||||
private reportChange(imageId: Nullable<number>, status?: Partial<Image>) {
|
||||
if (imageId != null) {
|
||||
if (status != null) {
|
||||
if (this.volatileState[imageId] == null) {
|
||||
this.volatileState[imageId] = { imageId } as Image;
|
||||
}
|
||||
_.merge(this.volatileState[imageId], status);
|
||||
return this.emit('change');
|
||||
} else if (this.volatileState[imageId] != null) {
|
||||
delete this.volatileState[imageId];
|
||||
return this.emit('change');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static hasDigest(name: Nullable<string>): boolean {
|
||||
if (name == null) {
|
||||
return false;
|
||||
}
|
||||
const parts = name.split('@');
|
||||
if (parts[1] == null) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
export default Images;
|
@ -149,12 +149,13 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
|
||||
);
|
||||
|
||||
// TODO: Support dependent applications when this feature is complete
|
||||
router.get('/v2/applications/state', (_req: Request, res: Response) => {
|
||||
router.get('/v2/applications/state', async (_req: Request, res: Response) => {
|
||||
// It's kinda hacky to access the services and db via the application manager
|
||||
// maybe refactor this code
|
||||
const localMode = await deviceState.config.get('localMode');
|
||||
Bluebird.join(
|
||||
applications.services.getStatus(),
|
||||
applications.images.getStatus(),
|
||||
applications.images.getStatus(localMode),
|
||||
applications.db.models('app').select(['appId', 'commit', 'name']),
|
||||
(
|
||||
services,
|
||||
@ -210,7 +211,7 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
|
||||
response[appName].services[img.serviceName] = {
|
||||
status,
|
||||
releaseId: img.releaseId,
|
||||
downloadProgress: img.downloadProgress,
|
||||
downloadProgress: img.downloadProgress || null,
|
||||
};
|
||||
});
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user