Gzip while streaming, rather than gzipping the buffer up front

Connects-To: #549
This commit is contained in:
Tim Perry 2017-06-13 19:28:37 +02:00
parent e584dc43f7
commit df3c5ca07f
5 changed files with 76 additions and 91 deletions

View File

@ -1,5 +1,5 @@
// Generated by CoffeeScript 1.12.6 // Generated by CoffeeScript 1.12.6
var Promise, dockerUtils, formatImageName, getBuilderLogPushEndpoint, getBuilderPushEndpoint, getBundleInfo, parseInput, performUpload, renderProgress, showPushProgress, uploadLogs, uploadToPromise; var Promise, dockerUtils, formatImageName, getBuilderLogPushEndpoint, getBuilderPushEndpoint, getBundleInfo, parseInput, performUpload, renderProgress, showPushProgress, updatePushProgress, uploadLogs, uploadToPromise;
Promise = require('bluebird'); Promise = require('bluebird');
@ -64,27 +64,23 @@ renderProgress = function(percentage, stepCount) {
return bar + " " + (percentage.toFixed(1)) + "%"; return bar + " " + (percentage.toFixed(1)) + "%";
}; };
showPushProgress = function(imageSize, request, logStreams, timeout) { showPushProgress = function(logStreams) {
var ansiEscapes, logging, progressReporter; var logging;
if (timeout == null) { logging = require('../utils/logging');
timeout = 250; return logging.logInfo(logStreams, renderProgress(0));
} };
updatePushProgress = function(percentage, logStreams) {
var ansiEscapes, logging;
logging = require('../utils/logging'); logging = require('../utils/logging');
ansiEscapes = require('ansi-escapes'); ansiEscapes = require('ansi-escapes');
logging.logInfo(logStreams, 'Initializing...'); if (percentage >= 100) {
return progressReporter = setInterval(function() { percentage = 100;
var percent, sent; }
sent = request.req.connection._bytesDispatched; process.stdout.write(ansiEscapes.cursorUp(1));
percent = (sent / imageSize) * 100; process.stdout.clearLine();
if (percent >= 100) { process.stdout.cursorTo(0);
clearInterval(progressReporter); return logging.logInfo(logStreams, renderProgress(percentage));
percent = 100;
}
process.stdout.write(ansiEscapes.cursorUp(1));
process.stdout.clearLine();
process.stdout.cursorTo(0);
return logging.logInfo(logStreams, renderProgress(percent));
}, timeout);
}; };
getBundleInfo = function(options) { getBundleInfo = function(options) {
@ -95,9 +91,20 @@ getBundleInfo = function(options) {
}); });
}; };
performUpload = function(gzippedImage, token, username, url, appName, logStreams) { performUpload = function(imageStream, token, username, url, appName, logStreams) {
var request, uploadRequest; var progressStream, request, streamWithProgress, uploadRequest, zlib;
request = require('request'); request = require('request');
progressStream = require('progress-stream');
zlib = require('zlib');
showPushProgress(logStreams);
streamWithProgress = imageStream.pipe(progressStream({
time: 500,
length: imageStream.length
}, function(arg) {
var percentage;
percentage = arg.percentage;
return updatePushProgress(percentage, logStreams);
}));
uploadRequest = request.post({ uploadRequest = request.post({
url: getBuilderPushEndpoint(url, username, appName), url: getBuilderPushEndpoint(url, username, appName),
headers: { headers: {
@ -106,9 +113,9 @@ performUpload = function(gzippedImage, token, username, url, appName, logStreams
auth: { auth: {
bearer: token bearer: token
}, },
body: gzippedImage.stream body: streamWithProgress.pipe(zlib.createGzip())
}); });
return uploadToPromise(uploadRequest, gzippedImage.size, logStreams); return uploadToPromise(uploadRequest, logStreams);
}; };
uploadLogs = function(logs, token, url, buildId, username, appName) { uploadLogs = function(logs, token, url, buildId, username, appName) {
@ -124,7 +131,7 @@ uploadLogs = function(logs, token, url, buildId, username, appName) {
}); });
}; };
uploadToPromise = function(uploadRequest, size, logStreams) { uploadToPromise = function(uploadRequest, logStreams) {
var logging; var logging;
logging = require('../utils/logging'); logging = require('../utils/logging');
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
@ -156,8 +163,7 @@ uploadToPromise = function(uploadRequest, size, logStreams) {
return reject(new Error("Received unexpected reply from remote: " + data)); return reject(new Error("Received unexpected reply from remote: " + data));
} }
}; };
uploadRequest.on('error', reject).on('data', handleMessage); return uploadRequest.on('error', reject).on('data', handleMessage);
return showPushProgress(size, uploadRequest, logStreams);
}); });
}; };
@ -219,7 +225,7 @@ module.exports = {
var buildLogs, imageName; var buildLogs, imageName;
imageName = arg1.image, buildLogs = arg1.log; imageName = arg1.image, buildLogs = arg1.log;
logs = buildLogs; logs = buildLogs;
return Promise.all([dockerUtils.gzipAndBufferImage(docker, imageName, bufferFile), token, username, url, params.appName, logStreams]).spread(performUpload); return Promise.all([dockerUtils.bufferImage(docker, imageName, bufferFile), token, username, url, params.appName, logStreams]).spread(performUpload);
})["finally"](function() { })["finally"](function() {
return require('mz/fs').unlink(bufferFile)["catch"](_.noop); return require('mz/fs').unlink(bufferFile)["catch"](_.noop);
}); });

View File

@ -273,24 +273,15 @@ exports.runBuild = function(params, options, getBundleInfo, logStreams) {
}); });
}; };
exports.gzipAndBufferImage = function(docker, imageId, bufferFile) { exports.bufferImage = function(docker, imageId, bufferFile) {
var fs, image, streamUtils, zlib; var Promise, image, imageMetadata, streamUtils;
Promise = require('bluebird');
streamUtils = require('./streams'); streamUtils = require('./streams');
zlib = require('zlib');
fs = require('mz/fs');
image = docker.getImage(imageId); image = docker.getImage(imageId);
return image.get().then(function(imageStream) { imageMetadata = image.inspectAsync();
var gzippedStream; return Promise.all([image.get(), imageMetadata.get('Size')]).spread(function(imageStream, imageSize) {
gzippedStream = imageStream.pipe(zlib.createGzip()); return streamUtils.buffer(imageStream, bufferFile).tap(function(bufferedStream) {
return streamUtils.buffer(gzippedStream, bufferFile); return bufferedStream.length = imageSize;
}).then(function(bufferedStream) {
return fs.stat(bufferFile).then(function(stats) {
var size;
size = stats.size;
return {
stream: bufferedStream,
size: stats.size
};
}); });
}); });
}; };
@ -304,10 +295,6 @@ exports.getDocker = function(options) {
return new Docker(connectOpts); return new Docker(connectOpts);
}; };
exports.getImageSize = function(docker, image) {
return docker.getImage(image).inspectAsync().get('Size');
};
hasQemu = function() { hasQemu = function() {
var fs; var fs;
fs = require('mz/fs'); fs = require('mz/fs');

View File

@ -41,22 +41,20 @@ renderProgress = (percentage, stepCount = 50) ->
bar = "[#{_.repeat('=', barCount)}>#{_.repeat(' ', spaceCount)}]" bar = "[#{_.repeat('=', barCount)}>#{_.repeat(' ', spaceCount)}]"
return "#{bar} #{percentage.toFixed(1)}%" return "#{bar} #{percentage.toFixed(1)}%"
showPushProgress = (imageSize, request, logStreams, timeout = 250) -> showPushProgress = (logStreams) ->
logging = require('../utils/logging')
logging.logInfo(logStreams, renderProgress(0))
updatePushProgress = (percentage, logStreams) ->
logging = require('../utils/logging') logging = require('../utils/logging')
ansiEscapes = require('ansi-escapes') ansiEscapes = require('ansi-escapes')
logging.logInfo(logStreams, 'Initializing...') if percentage >= 100
progressReporter = setInterval -> percentage = 100
sent = request.req.connection._bytesDispatched process.stdout.write(ansiEscapes.cursorUp(1))
percent = (sent / imageSize) * 100 process.stdout.clearLine()
if percent >= 100 process.stdout.cursorTo(0)
clearInterval(progressReporter) logging.logInfo(logStreams, renderProgress(percentage))
percent = 100
process.stdout.write(ansiEscapes.cursorUp(1))
process.stdout.clearLine()
process.stdout.cursorTo(0)
logging.logInfo(logStreams, renderProgress(percent))
, timeout
getBundleInfo = (options) -> getBundleInfo = (options) ->
helpers = require('../utils/helpers') helpers = require('../utils/helpers')
@ -65,17 +63,26 @@ getBundleInfo = (options) ->
.then (app) -> .then (app) ->
[app.arch, app.device_type] [app.arch, app.device_type]
performUpload = (gzippedImage, token, username, url, appName, logStreams) -> performUpload = (imageStream, token, username, url, appName, logStreams) ->
request = require('request') request = require('request')
progressStream = require('progress-stream')
zlib = require('zlib')
showPushProgress(logStreams)
streamWithProgress = imageStream.pipe(progressStream({
time: 500,
length: imageStream.length
}, ({ percentage }) -> updatePushProgress(percentage, logStreams)))
uploadRequest = request.post uploadRequest = request.post
url: getBuilderPushEndpoint(url, username, appName) url: getBuilderPushEndpoint(url, username, appName)
headers: headers:
'Content-Encoding': 'gzip' 'Content-Encoding': 'gzip'
auth: auth:
bearer: token bearer: token
body: gzippedImage.stream body: streamWithProgress.pipe(zlib.createGzip())
uploadToPromise(uploadRequest, gzippedImage.size, logStreams) uploadToPromise(uploadRequest, logStreams)
uploadLogs = (logs, token, url, buildId, username, appName) -> uploadLogs = (logs, token, url, buildId, username, appName) ->
request = require('request') request = require('request')
@ -86,7 +93,7 @@ uploadLogs = (logs, token, url, buildId, username, appName) ->
bearer: token bearer: token
body: Buffer.from(logs) body: Buffer.from(logs)
uploadToPromise = (uploadRequest, size, logStreams) -> uploadToPromise = (uploadRequest, logStreams) ->
logging = require('../utils/logging') logging = require('../utils/logging')
new Promise (resolve, reject) -> new Promise (resolve, reject) ->
@ -115,10 +122,6 @@ uploadToPromise = (uploadRequest, size, logStreams) ->
.on('error', reject) .on('error', reject)
.on('data', handleMessage) .on('data', handleMessage)
# Set up upload reporting
showPushProgress(size, uploadRequest, logStreams)
module.exports = module.exports =
signature: 'deploy <appName> [image]' signature: 'deploy <appName> [image]'
description: 'Deploy a container to a resin.io application' description: 'Deploy a container to a resin.io application'
@ -189,7 +192,7 @@ module.exports =
.then ({ image: imageName, log: buildLogs }) -> .then ({ image: imageName, log: buildLogs }) ->
logs = buildLogs logs = buildLogs
Promise.all [ Promise.all [
dockerUtils.gzipAndBufferImage(docker, imageName, bufferFile) dockerUtils.bufferImage(docker, imageName, bufferFile)
token token
username username
url url

View File

@ -283,25 +283,18 @@ exports.runBuild = (params, options, getBundleInfo, logStreams) ->
# Given an image id or tag, export the image to a tar archive, # Given an image id or tag, export the image to a tar archive,
# gzip the result, and buffer it to disk. # gzip the result, and buffer it to disk.
# Returns a { stream, size } object exports.bufferImage = (docker, imageId, bufferFile) ->
exports.gzipAndBufferImage = (docker, imageId, bufferFile) -> Promise = require('bluebird')
streamUtils = require('./streams') streamUtils = require('./streams')
zlib = require('zlib')
fs = require('mz/fs')
image = docker.getImage(imageId) image = docker.getImage(imageId)
image.get() imageMetadata = image.inspectAsync()
.then (imageStream) ->
gzippedStream = imageStream.pipe(zlib.createGzip()) Promise.all([image.get(), imageMetadata.get('Size')])
streamUtils.buffer(gzippedStream, bufferFile) .spread (imageStream, imageSize) ->
.then (bufferedStream) -> streamUtils.buffer(imageStream, bufferFile)
fs.stat(bufferFile) .tap (bufferedStream) ->
.then (stats) -> bufferedStream.length = imageSize
size = stats.size
return {
stream: bufferedStream,
size: stats.size
}
exports.getDocker = (options) -> exports.getDocker = (options) ->
Docker = require('dockerode') Docker = require('dockerode')
@ -311,10 +304,6 @@ exports.getDocker = (options) ->
connectOpts['Promise'] = Promise connectOpts['Promise'] = Promise
new Docker(connectOpts) new Docker(connectOpts)
exports.getImageSize = (docker, image) ->
docker.getImage(image).inspectAsync()
.get('Size')
hasQemu = -> hasQemu = ->
fs = require('mz/fs') fs = require('mz/fs')
@ -381,4 +370,3 @@ copyQemu = (context) ->
.then -> .then ->
fs.chmod(binPath, '755') fs.chmod(binPath, '755')
.return(binPath) .return(binPath)

View File

@ -63,6 +63,7 @@
"nplugm": "^3.0.0", "nplugm": "^3.0.0",
"president": "^2.0.1", "president": "^2.0.1",
"prettyjson": "^1.1.3", "prettyjson": "^1.1.3",
"progress-stream": "^2.0.0",
"raven": "^1.2.0", "raven": "^1.2.0",
"reconfix": "^0.0.3", "reconfix": "^0.0.3",
"request": "^2.81.0", "request": "^2.81.0",