mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-02-20 17:52:51 +00:00
Use rwlock to block when images are being pulled
This commit is contained in:
parent
95f4fdb97f
commit
d58517f32d
@ -27,7 +27,8 @@ knex.init.then ->
|
||||
device.getOSVersion()
|
||||
.then (osVersion) ->
|
||||
console.log('Starting API server..')
|
||||
api(application).listen(config.listenPort)
|
||||
apiServer = api(application).listen(config.listenPort)
|
||||
apiServer.timeout = config.apiTimeout
|
||||
# Let API know what version we are, and our api connection info.
|
||||
console.log('Updating supervisor version and api info')
|
||||
device.updateState(
|
||||
|
@ -19,6 +19,7 @@ dockerRoot = checkString(process.env.DOCKER_ROOT) ? '/mnt/root/var/lib/rce'
|
||||
# Defaults needed for both gosuper and node supervisor are declared in entry.sh
|
||||
module.exports = config =
|
||||
apiEndpoint: checkString(process.env.API_ENDPOINT) ? 'https://api.resin.io'
|
||||
apiTimeout: checkInt(process.env.API_TIMEOUT) ? 15 * 60 * 1000
|
||||
listenPort: checkInt(process.env.LISTEN_PORT) ? 80
|
||||
gosuperAddress: "http://unix:#{process.env.GOSUPER_SOCKET}:"
|
||||
deltaHost: checkString(process.env.DELTA_ENDPOINT) ? 'https://delta.resin.io'
|
||||
|
@ -118,61 +118,60 @@ dockerSync = (imgSrc, imgDest, rsyncDiff, conf) ->
|
||||
do ->
|
||||
_lock = new Lock()
|
||||
_writeLock = Promise.promisify(_lock.async.writeLock)
|
||||
lockImages = ->
|
||||
_readLock = Promise.promisify(_lock.async.readLock)
|
||||
writeLockImages = ->
|
||||
_writeLock('images')
|
||||
.disposer (release) ->
|
||||
release()
|
||||
readLockImages = ->
|
||||
_readLock('images')
|
||||
.disposer (release) ->
|
||||
release()
|
||||
|
||||
exports.rsyncImageWithProgress = (imgDest, onProgress, startFromEmpty = false) ->
|
||||
imagesBeingFetched++
|
||||
Promise.try ->
|
||||
if startFromEmpty
|
||||
return 'resin/scratch'
|
||||
findSimilarImage(imgDest)
|
||||
.then (imgSrc) ->
|
||||
rsyncDiff = new Promise (resolve, reject) ->
|
||||
progress request.get("#{config.deltaHost}/api/v1/delta?src=#{imgSrc}&dest=#{imgDest}", timeout: DELTA_REQUEST_TIMEOUT)
|
||||
.on 'progress', (progress) ->
|
||||
onProgress(percentage: progress.percent)
|
||||
.on 'end', ->
|
||||
onProgress(percentage: 100)
|
||||
.on 'response', (res) ->
|
||||
if res.statusCode isnt 200
|
||||
reject(new Error("Got #{res.statusCode} when requesting image from delta server."))
|
||||
else
|
||||
resolve(res)
|
||||
.on 'error', reject
|
||||
.pause()
|
||||
Promise.using readLockImages(), ->
|
||||
Promise.try ->
|
||||
if startFromEmpty
|
||||
return 'resin/scratch'
|
||||
findSimilarImage(imgDest)
|
||||
.then (imgSrc) ->
|
||||
rsyncDiff = new Promise (resolve, reject) ->
|
||||
progress request.get("#{config.deltaHost}/api/v1/delta?src=#{imgSrc}&dest=#{imgDest}", timeout: DELTA_REQUEST_TIMEOUT)
|
||||
.on 'progress', (progress) ->
|
||||
onProgress(percentage: progress.percent)
|
||||
.on 'end', ->
|
||||
onProgress(percentage: 100)
|
||||
.on 'response', (res) ->
|
||||
if res.statusCode isnt 200
|
||||
reject(new Error("Got #{res.statusCode} when requesting image from delta server."))
|
||||
else
|
||||
resolve(res)
|
||||
.on 'error', reject
|
||||
.pause()
|
||||
|
||||
imageConfig = request.getAsync("#{config.deltaHost}/api/v1/config?image=#{imgDest}", {json: true, timeout: 0})
|
||||
.spread ({statusCode}, imageConfig) ->
|
||||
if statusCode isnt 200
|
||||
throw new Error("Invalid configuration: #{imageConfig}")
|
||||
return imageConfig
|
||||
imageConfig = request.getAsync("#{config.deltaHost}/api/v1/config?image=#{imgDest}", {json: true, timeout: 0})
|
||||
.spread ({statusCode}, imageConfig) ->
|
||||
if statusCode isnt 200
|
||||
throw new Error("Invalid configuration: #{imageConfig}")
|
||||
return imageConfig
|
||||
|
||||
return [ rsyncDiff, imageConfig, imgSrc ]
|
||||
.spread (rsyncDiff, imageConfig, imgSrc) ->
|
||||
dockerSync(imgSrc, imgDest, rsyncDiff, imageConfig)
|
||||
.catch OutOfSyncError, (err) ->
|
||||
console.log('Falling back to delta-from-empty')
|
||||
exports.rsyncImageWithProgress(imgDest, onProgress, true)
|
||||
.finally ->
|
||||
imagesBeingFetched--
|
||||
return [ rsyncDiff, imageConfig, imgSrc ]
|
||||
.spread (rsyncDiff, imageConfig, imgSrc) ->
|
||||
dockerSync(imgSrc, imgDest, rsyncDiff, imageConfig)
|
||||
.catch OutOfSyncError, (err) ->
|
||||
console.log('Falling back to delta-from-empty')
|
||||
exports.rsyncImageWithProgress(imgDest, onProgress, true)
|
||||
|
||||
# Keep track of the images being fetched, so we don't clean them up whilst fetching.
|
||||
imagesBeingFetched = 0
|
||||
exports.fetchImageWithProgress = (image, onProgress) ->
|
||||
imagesBeingFetched++
|
||||
dockerProgress.pull(image, onProgress)
|
||||
.finally ->
|
||||
imagesBeingFetched--
|
||||
Promise.using readLockImages(), ->
|
||||
dockerProgress.pull(image, onProgress)
|
||||
|
||||
supervisorTag = config.supervisorImage
|
||||
if !/:/g.test(supervisorTag)
|
||||
# If there is no tag then mark it as latest
|
||||
supervisorTag += ':latest'
|
||||
exports.cleanupContainersAndImages = ->
|
||||
Promise.using lockImages(), ->
|
||||
Promise.using writeLockImages(), ->
|
||||
Promise.join(
|
||||
knex('image').select('repoTag')
|
||||
.map (image) ->
|
||||
@ -214,8 +213,6 @@ do ->
|
||||
console.log('Deleted container:', containerInfo.Id, containerInfo.Image)
|
||||
.catch(_.noop)
|
||||
.then ->
|
||||
# And then clean up the images, as long as we aren't currently trying to fetch any.
|
||||
return if imagesBeingFetched > 0
|
||||
imagesToClean = _.reject images, (image) ->
|
||||
_.any image.RepoTags, (tag) ->
|
||||
return _.contains(appTags, tag) or _.contains(supervisorTags, tag) or _.contains(locallyCreatedTags, tag)
|
||||
@ -252,7 +249,7 @@ do ->
|
||||
repoTag += ':' + tag if tag?
|
||||
else
|
||||
repoTag = buildRepoTag(repo, tag, registry)
|
||||
Promise.using lockImages(), ->
|
||||
Promise.using writeLockImages(), ->
|
||||
knex('image').insert({ repoTag })
|
||||
.then ->
|
||||
if fromImage?
|
||||
@ -265,10 +262,7 @@ do ->
|
||||
res.status(500).send(err?.message or err or 'Unknown error')
|
||||
|
||||
exports.loadImage = (req, res) ->
|
||||
if imagesBeingFetched > 0
|
||||
res.status(500).send('Cannot load an image while the supervisor is pulling an update')
|
||||
return
|
||||
Promise.using lockImages(), ->
|
||||
Promise.using writeLockImages(), ->
|
||||
docker.listImagesAsync()
|
||||
.then (oldImages) ->
|
||||
docker.loadImageAsync(req)
|
||||
@ -287,7 +281,7 @@ do ->
|
||||
|
||||
exports.deleteImage = (req, res) ->
|
||||
imageName = req.params[0]
|
||||
Promise.using lockImages(), ->
|
||||
Promise.using writeLockImages(), ->
|
||||
knex('image').select().where('repoTag', imageName)
|
||||
.then (images) ->
|
||||
throw new Error('Only images created via the Supervisor can be deleted.') if images.length == 0
|
||||
@ -308,7 +302,7 @@ do ->
|
||||
|
||||
docker.modem.dialAsync = Promise.promisify(docker.modem.dial)
|
||||
exports.createContainer = (req, res) ->
|
||||
Promise.using lockImages(), ->
|
||||
Promise.using writeLockImages(), ->
|
||||
knex('image').select().where('repoTag', req.body.Image)
|
||||
.then (images) ->
|
||||
throw new Error('Only images created via the Supervisor can be used for creating containers.') if images.length == 0
|
||||
|
Loading…
x
Reference in New Issue
Block a user