Merge pull request #1 from resin-io/rsync-deltas

Rsync deltas
This commit is contained in:
Pablo Carranza Vélez 2016-01-28 15:40:01 -03:00
commit 5597ab0a85
11 changed files with 192 additions and 12 deletions

View File

@ -1,3 +1,5 @@
* Add support for delta image download [petrosagg and Pablo]
# v1.4.0 # v1.4.0
* Report Host OS version to the API [Pablo] * Report Host OS version to the API [Pablo]

View File

@ -4,7 +4,13 @@ COPY 01_nodoc /etc/dpkg/dpkg.cfg.d/
# Supervisor apt dependencies # Supervisor apt dependencies
RUN apt-get -q update \ RUN apt-get -q update \
&& apt-get install -qqy socat supervisor --no-install-recommends \ && apt-get install -qqy \
btrfs-tools \
ca-certificates \
rsync \
socat \
supervisor \
--no-install-recommends \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/ && rm -rf /var/lib/apt/lists/

View File

@ -4,7 +4,13 @@ COPY 01_nodoc /etc/dpkg/dpkg.cfg.d/
# Supervisor apt dependencies # Supervisor apt dependencies
RUN apt-get -q update \ RUN apt-get -q update \
&& apt-get install -qqy socat supervisor --no-install-recommends \ && apt-get install -qqy \
btrfs-tools \
ca-certificates \
rsync \
socat \
supervisor \
--no-install-recommends \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/ && rm -rf /var/lib/apt/lists/

View File

@ -4,7 +4,13 @@ COPY 01_nodoc /etc/dpkg/dpkg.cfg.d/
# Supervisor apt dependencies # Supervisor apt dependencies
RUN apt-get -q update \ RUN apt-get -q update \
&& apt-get install -qqy socat supervisor --no-install-recommends \ && apt-get install -qqy \
btrfs-tools \
ca-certificates \
rsync \
socat \
supervisor \
--no-install-recommends \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/ && rm -rf /var/lib/apt/lists/

View File

@ -4,7 +4,13 @@ COPY 01_nodoc /etc/dpkg/dpkg.cfg.d/
# Supervisor apt dependencies # Supervisor apt dependencies
RUN apt-get -q update \ RUN apt-get -q update \
&& apt-get install -qqy socat supervisor --no-install-recommends \ && apt-get install -qqy \
btrfs-tools \
ca-certificates \
rsync \
socat \
supervisor \
--no-install-recommends \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/ && rm -rf /var/lib/apt/lists/

View File

@ -4,7 +4,13 @@ COPY 01_nodoc /etc/dpkg/dpkg.cfg.d/
# Supervisor apt dependencies # Supervisor apt dependencies
RUN apt-get -q update \ RUN apt-get -q update \
&& apt-get install -qqy socat supervisor --no-install-recommends \ && apt-get install -qqy \
btrfs-tools \
ca-certificates \
rsync \
socat \
supervisor \
--no-install-recommends \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/ && rm -rf /var/lib/apt/lists/

BIN
empty.tar Normal file

Binary file not shown.

View File

@ -10,8 +10,8 @@
"bluebird": "^2.9.24", "bluebird": "^2.9.24",
"body-parser": "^1.12.0", "body-parser": "^1.12.0",
"coffee-script": "~1.9.1", "coffee-script": "~1.9.1",
"docker-progress": "^1.1.0", "docker-progress": "^2.0.1",
"dockerode": "~2.2.1", "dockerode": "~2.2.9",
"event-stream": "^3.0.20", "event-stream": "^3.0.20",
"express": "^4.0.0", "express": "^4.0.0",
"knex": "~0.8.3", "knex": "~0.8.3",
@ -24,6 +24,7 @@
"pubnub": "^3.7.13", "pubnub": "^3.7.13",
"request": "^2.51.0", "request": "^2.51.0",
"resin-register-device": "^2.0.0", "resin-register-device": "^2.0.0",
"request-progress": "^0.3.1",
"rwlock": "^5.0.0", "rwlock": "^5.0.0",
"server-destroy": "^1.0.0", "server-destroy": "^1.0.0",
"sqlite3": "~3.0.4", "sqlite3": "~3.0.4",

View File

@ -126,12 +126,21 @@ isValidPort = (port) ->
return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535 return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535
fetch = (app) -> fetch = (app) ->
onProgress = (progress) ->
device.updateState(download_progress: progress.percentage)
docker.getImage(app.imageId).inspectAsync() docker.getImage(app.imageId).inspectAsync()
.catch (error) -> .catch (error) ->
logSystemEvent(logTypes.downloadApp, app) logSystemEvent(logTypes.downloadApp, app)
device.updateState(status: 'Downloading', download_progress: 0) device.updateState(status: 'Downloading', download_progress: 0)
dockerUtils.fetchImageWithProgress app.imageId, (progress) ->
device.updateState(download_progress: progress.percentage) Promise.try ->
JSON.parse(app.env)
.then (env) ->
if env['RESIN_SUPERVISOR_DELTA'] == '1'
dockerUtils.rsyncImageWithProgress(app.imageId, onProgress)
else
dockerUtils.fetchImageWithProgress(app.imageId, onProgress)
.then -> .then ->
logSystemEvent(logTypes.downloadAppSuccess, app) logSystemEvent(logTypes.downloadAppSuccess, app)
device.updateState(status: 'Idle', download_progress: null) device.updateState(status: 'Idle', download_progress: null)
@ -577,7 +586,7 @@ application.update = update = (force) ->
if updateStatus.state is UPDATE_REQUIRED if updateStatus.state is UPDATE_REQUIRED
console.log('Updating failed, but there is already another update scheduled immediately: ', err) console.log('Updating failed, but there is already another update scheduled immediately: ', err)
return return
delayTime = Math.min(updateStatus.failed * 500, 30000) delayTime = Math.min((2 ** updateStatus.failed) * 500, 30000)
# If there was an error then schedule another attempt briefly in the future. # If there was an error then schedule another attempt briefly in the future.
console.log('Scheduling another update attempt due to failure: ', delayTime, err) console.log('Scheduling another update attempt due to failure: ', delayTime, err)
setTimeout(update, delayTime, force) setTimeout(update, delayTime, force)

View File

@ -13,11 +13,14 @@ checkValidKey = (s) ->
return return
return s return s
dockerRoot = process.env.DOCKER_ROOT ? '/mnt/root/var/lib/rce'
# Defaults needed for both gosuper and node supervisor are declared in entry.sh # Defaults needed for both gosuper and node supervisor are declared in entry.sh
module.exports = config = module.exports = config =
apiEndpoint: process.env.API_ENDPOINT ? 'https://api.resin.io' apiEndpoint: process.env.API_ENDPOINT ? 'https://api.resin.io'
listenPort: process.env.LISTEN_PORT ? 80 listenPort: process.env.LISTEN_PORT ? 80
gosuperAddress: "http://unix:#{process.env.GOSUPER_SOCKET}:" gosuperAddress: "http://unix:#{process.env.GOSUPER_SOCKET}:"
deltaHost: process.env.DELTA_ENDPOINT ? 'https://delta.resin.io'
registryEndpoint: process.env.REGISTRY_ENDPOINT ? 'registry.resin.io' registryEndpoint: process.env.REGISTRY_ENDPOINT ? 'registry.resin.io'
pubnub: pubnub:
subscribe_key: checkValidKey(process.env.PUBNUB_SUBSCRIBE_KEY) ? process.env.DEFAULT_PUBNUB_SUBSCRIBE_KEY subscribe_key: checkValidKey(process.env.PUBNUB_SUBSCRIBE_KEY) ? process.env.DEFAULT_PUBNUB_SUBSCRIBE_KEY
@ -37,3 +40,5 @@ module.exports = config =
vpnStatusPath: process.env.VPN_STATUS_PATH ? '/mnt/root/run/openvpn/vpn_status' vpnStatusPath: process.env.VPN_STATUS_PATH ? '/mnt/root/run/openvpn/vpn_status'
checkInt: checkInt checkInt: checkInt
hostOsVersionPath: process.env.HOST_OS_VERSION_PATH ? '/mnt/root/etc/os-release' hostOsVersionPath: process.env.HOST_OS_VERSION_PATH ? '/mnt/root/etc/os-release'
dockerRoot: dockerRoot
btrfsRoot: process.env.BTRFS_ROOT ? "#{dockerRoot}/btrfs/subvolumes"

View File

@ -1,9 +1,16 @@
Docker = require 'dockerode' Docker = require 'dockerode'
DockerProgress = require 'docker-progress' { getRegistryAndName, DockerProgress } = require 'docker-progress'
Promise = require 'bluebird' Promise = require 'bluebird'
{ spawn, execAsync } = Promise.promisifyAll require 'child_process'
progress = require 'request-progress'
config = require './config' config = require './config'
_ = require 'lodash' _ = require 'lodash'
knex = require './db' knex = require './db'
TypedError = require 'typed-error'
{ request } = require './request'
fs = Promise.promisifyAll require 'fs'
class OutOfSyncError extends TypedError
docker = Promise.promisifyAll(new Docker(socketPath: config.dockerSocket)) docker = Promise.promisifyAll(new Docker(socketPath: config.dockerSocket))
# Hack dockerode to promisify internal classes' prototypes # Hack dockerode to promisify internal classes' prototypes
@ -13,6 +20,133 @@ Promise.promisifyAll(docker.getContainer().constructor.prototype)
exports.docker = docker exports.docker = docker
dockerProgress = new DockerProgress(socketPath: config.dockerSocket) dockerProgress = new DockerProgress(socketPath: config.dockerSocket)
# Create an array of (repoTag, image_id, created) tuples like the output of `docker images`
listRepoTagsAsync = ->
docker.listImagesAsync()
.then (images) ->
images = _.sortByOrder(images, 'Created', [ false ])
ret = []
for image in images
for repoTag in image.RepoTags
ret.push [ repoTag, image.Id, image.Created ]
return ret
# Find either the most recent image of the same app or the image of the supervisor.
# Returns an image Id or Tag (depending on whatever's available)
findSimilarImage = (repoTag) ->
application = repoTag.split('/')[1]
listRepoTagsAsync()
.then (repoTags) ->
# Find the most recent image of the same application
for repoTag in repoTags
otherApplication = repoTag[0].split('/')[1]
if otherApplication is application
return repoTag[0]
# Otherwise return the image for the most specific supervisor tag (commit hash)
for repoTag in repoTags when /resin\/.*-supervisor.*:[0-9a-f]{6}/.test(repoTag[0])
return repoTag[0]
# Or return *any* supervisor image available (except latest which is usually a phony tag)
for repoTag in repoTags when /resin\/.*-supervisor.*:(?!latest)/.test(repoTag[0])
return repoTag[0]
# If all else fails, return the newest image available
for repoTag in repoTags when repoTag[0] isnt '<none>:<none>'
return repoTag[0]
return 'resin/scratch'
DELTA_OUT_OF_SYNC_CODES = [23, 24]
exports.rsyncImageWithProgress = (imgDest, onProgress, startFromEmpty = false) ->
Promise.try ->
if startFromEmpty
return 'resin/scratch'
findSimilarImage(imgDest)
.then (imgSrc) ->
rsyncDiff = new Promise (resolve, reject) ->
progress request.get("#{config.deltaHost}/api/v1/delta?src=#{imgSrc}&dest=#{imgDest}", timeout: 5 * 60 * 1000)
.on 'progress', (progress) ->
onProgress(percentage: progress.percent)
.on 'end', ->
onProgress(percentage: 100)
.on 'response', (res) ->
if res.statusCode isnt 200
reject(new Error("Got #{res.statusCode} when requesting image from delta server."))
else
resolve(res)
.on 'error', reject
.pause()
imageConfig = request.getAsync("#{config.deltaHost}/api/v1/config?image=#{imgDest}", {json: true, timeout: 0})
.spread ({statusCode}, imageConfig) ->
if statusCode isnt 200
throw new Error("Invalid configuration: #{imageConfig}")
return imageConfig
return [ rsyncDiff, imageConfig, imgSrc ]
.spread (rsyncDiff, imageConfig, imgSrc) ->
dockerSync(imgSrc, imgDest, rsyncDiff, imageConfig)
.catch OutOfSyncError, (err) ->
console.log('Falling back to delta-from-empty')
exports.rsyncImageWithProgress(imgDest, onProgress, true)
getRepoAndTag = (image) ->
getRegistryAndName(image)
.then ({ registry, imageName, tagName }) ->
registry = registry.toString().replace(':443','')
return { repo: "#{registry}/#{imageName}", tag: tagName }
dockerSync = (imgSrc, imgDest, rsyncDiff, conf) ->
docker.importImageAsync('/app/empty.tar')
.then (stream) ->
new Promise (resolve, reject) ->
streamOutput = ''
stream.on 'data', (data) ->
streamOutput += data
stream.on 'error', reject
stream.on 'end', ->
resolve(JSON.parse(streamOutput).status)
.then (destId) ->
jsonPath = "#{config.dockerRoot}/graph/#{destId}/json"
fs.readFileAsync(jsonPath)
.then(JSON.parse)
.then (destJson) ->
destJson.config = conf
fs.writeFileAsync(jsonPath + '.tmp', JSON.stringify(destJson))
.then ->
fs.renameAsync(jsonPath + '.tmp', jsonPath)
.then ->
if imgSrc isnt 'resin/scratch'
execAsync("btrfs subvolume delete \"#{config.btrfsRoot}/#{destId}\"")
.then ->
docker.getImage(imgSrc).inspectAsync().get('Id')
.then (srcId) ->
execAsync("btrfs subvolume snapshot \"#{config.btrfsRoot}/#{srcId}\" \"#{config.btrfsRoot}/#{destId}\"")
.then ->
new Promise (resolve, reject) ->
rsync = spawn('rsync', ['--timeout=300', '--archive', '--delete' , '--read-batch=-', "#{config.btrfsRoot}/#{destId}"], stdio: 'pipe')
.on 'error', reject
.on 'exit', (code, signal) ->
if code in DELTA_OUT_OF_SYNC_CODES
reject(new OutOfSyncError('Incompatible image'))
else if code isnt 0
reject(new Error("rsync exited. code: #{code} signal: #{signal}"))
else
resolve()
rsyncDiff.pipe(rsync.stdin)
rsync.stdout.pipe(process.stdout)
rsync.stderr.pipe(process.stdout)
rsyncDiff.resume()
.then ->
execAsync('sync')
.then ->
getRepoAndTag(imgDest)
.then ({ repo, tag }) ->
docker.getImage(destId).tagAsync({ repo, tag, force: true })
do -> do ->
# Keep track of the images being fetched, so we don't clean them up whilst fetching. # Keep track of the images being fetched, so we don't clean them up whilst fetching.
imagesBeingFetched = 0 imagesBeingFetched = 0
@ -77,4 +211,3 @@ do ->
docker.getContainer(id).inspectAsync() docker.getContainer(id).inspectAsync()
.then (data) -> .then (data) ->
return not data.State.Running return not data.State.Running