calculate progress in a transform stream and expose a callback for progress events

This commit is contained in:
Aleksis Brezas 2014-11-04 22:50:44 +00:00 committed by Pablo Carranza Vélez
parent 6b496693ad
commit a5ee91ceaf
2 changed files with 56 additions and 54 deletions

View File

@ -99,15 +99,8 @@ exports.start = start = (app) ->
utils.mixpanelTrack('Application install', app)
logSystemEvent('Installing application ' + app.imageId)
updateDeviceInfo(status: 'Downloading')
dockerUtils.fetchImageWithProgress(app.imageId)
.then (stream) ->
return new Promise (resolve, reject) ->
stream.on 'data', (d) ->
data = JSON.parse(d)
if data.progress?
updateDeviceInfo(download_progress: data.progress.percentage)
stream.on('error', reject)
stream.on('end', resolve)
dockerUtils.fetchImageWithProgress app.imageId, (progress) ->
updateDeviceInfo(download_progress: progress.percentage)
.then ->
console.log('Creating container:', app.imageId)
updateDeviceInfo(status: 'Starting')

View File

@ -37,12 +37,23 @@ do ->
# Pull docker image returning a stream that reports progress
# This is less safe than fetchImage and shouldnt be used for supervisor updates
exports.fetchImageWithProgress = (image) ->
exports.fetchImageWithProgress = (image, progressUpdates) ->
imagesBeingFetched++
pullImage(image)
.tap (stream) ->
stream.once 'end', ->
imagesBeingFetched--
docker.createImageAsync(fromImage: image)
.then (stream) ->
return new Promise (resolve, reject) ->
stream.on('error', reject)
.pipe(JSONStream.parse())
.on('error', reject)
.pipe(pullProgressStream(image))
.pipe es.mapSync (data) ->
if data.error?
reject(data.error)
else if data.totalProgress?
progressUpdates?(data.totalProgress)
.on('end', resolve)
.finally ->
imagesBeingFetched--
exports.cleanupContainersAndImages = ->
knex('app').select()
@ -79,7 +90,7 @@ do ->
exports.imageExists = imageExists = (imageId) ->
image = docker.getImage(imageId)
image.inspectAsync().then (data) ->
image.inspectAsync().then ->
return true
.catch (e) ->
return false
@ -107,15 +118,14 @@ do ->
.then (exists) ->
if exists
return 0
else
request("http://#{registry}/v1/images/#{imageId}/json")
.spread (res, data) ->
if res.statusCode >= 400
throw new Error("Failed to get image download size of #{imageId} from #{registry}. Status code: #{res.statusCode}")
return parseInt(res.headers['x-docker-size'])
request("http://#{registry}/v1/images/#{imageId}/json")
.spread (res, data) ->
if res.statusCode >= 400
throw new Error("Failed to get image download size of #{imageId} from #{registry}. Status code: #{res.statusCode}")
return parseInt(res.headers['x-docker-size'])
exports.getLayerDownloadSizes = getLayerDownloadSizes = (image, tag='latest') ->
[ registry, imageName ] = getRegistryAndName(image)
{registry, imageName} = getRegistryAndName(image)
imageSizes = {}
getImageId(registry, imageName, tag)
.then (imageId) ->
@ -140,36 +150,35 @@ do ->
throw new Error("Expected image name including registry domain name")
registry = image.substr(0,slashPos)
imageName = image.substr(slashPos+1)
return [registry,imageName]
# a wrapper for dockerode pull/createImage
# streaming progress of total download size
# instead of per layer progress output of `docker pull`
#
# image has to have the form registry.domain/repository/image
# (it has to include the registry you are pulling from)
exports.pullImage = pullImage = (image) ->
getLayerDownloadSizes(image).then (layerSizes) ->
return {registry, imageName}
# Create a stream that transforms docker pull output
# to include total progress metrics.
#
# The docker pull output should be piped to this stream as separate javascript objects.
pullProgressStream = (image) ->
totalSize = 0
completedSize = 0
currentSize = 0
sizePromise = getLayerDownloadSizes(image).tap (layerSizes) ->
totalSize = _.reduce(layerSizes, ((t,x) -> t+x), 0)
completedSize = 0
currentSize = 0
docker.pullAsync(image)
.then (inStream) ->
inStream.pipe es.through (data) ->
pull = JSON.parse(data)
if pull.status == 'Downloading'
currentSize = pull.progressDetail.current
else if pull.status == 'Download complete'
shortId = pull.id
longId = _.find(_.keys(layerSizes), (id) -> id.indexOf(shortId) is 0)
if longId?
completedSize += layerSizes[longId]
currentSize = 0
layerSizes[longId] = 0 # make sure we don't count this layer again
else
console.log 'Progress error: Unknown layer ' + id + ' downloaded by docker. Progress not correct.'
totalSize = null
downloadedSize = completedSize + currentSize
percentage = calculatePercentage downloadedSize, totalSize
progress = { downloadedSize, totalSize, percentage }
this.emit 'data', JSON.stringify({ progress, data })
es.map (pull, callback) ->
sizePromise.then (layerSizes) ->
if pull.status == 'Downloading'
currentSize = pull.progressDetail.current
else if pull.status == 'Download complete'
shortId = pull.id
longId = _.find(_.keys(layerSizes), (id) -> id.indexOf(shortId) is 0)
if longId?
completedSize += layerSizes[longId]
currentSize = 0
layerSizes[longId] = 0 # make sure we don't count this layer again
else
console.log 'Progress error: Unknown layer ' + id + ' downloaded by docker. Progress not correct.'
totalSize = null
downloadedSize = completedSize + currentSize
percentage = calculatePercentage(downloadedSize, totalSize)
pull.totalProgress = { downloadedSize, totalSize, percentage }
callback(null, pull)
.catch (error) =>
callback(null, { error })