Explicitly define the source for deltas, allow cross-app deltas, and iterate serially through apps when updating

This commit changes the way the source for a delta is determined. We used to do
it by comparing the available tags with the one we want and relying on the format that
includes the app in the image name. Now we explicitly choose a delta source from the previous app
version if we have one, and otherwise use the image from any available app - which will allow us
to have a valid source when moving a device between apps.

For this to work consistently if there's an unexpected reboot, we now avoid deleting an app from the db
until the full update has succeeded. Instead, we mark the app for deletion so that we still have the image stored after the reboot.

This commit also changes a .map to .mapSeries when iterating over appIds for removal/install/update - this avoids parallel treatment
of apps which can cause inconsistencies in the status reported to the API.

Change-Type: patch
Signed-off-by: Pablo Carranza Velez <pablo@resin.io>
This commit is contained in:
Pablo Carranza Velez 2017-09-14 13:56:23 -07:00
parent bf83fc3ec1
commit 31d09e70e4
5 changed files with 70 additions and 61 deletions

View File

@ -206,7 +206,7 @@ isValidPort = (port) ->
maybePort = parseInt(port, 10) maybePort = parseInt(port, 10)
return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535 return parseFloat(port) is maybePort and maybePort > 0 and maybePort < 65535
fetch = (app, setDeviceUpdateState = true) -> fetch = (app, { deltaSource, setDeviceUpdateState = true } = {}) ->
onProgress = (progress) -> onProgress = (progress) ->
device.updateState(download_progress: progress.percentage) device.updateState(download_progress: progress.percentage)
@ -226,6 +226,7 @@ fetch = (app, setDeviceUpdateState = true) ->
applyTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_APPLY_TIMEOUT'], positive: true) ? DEFAULT_DELTA_APPLY_TIMEOUT applyTimeout: checkInt(conf['RESIN_SUPERVISOR_DELTA_APPLY_TIMEOUT'], positive: true) ? DEFAULT_DELTA_APPLY_TIMEOUT
retryCount: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_COUNT'], positive: true) retryCount: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_COUNT'], positive: true)
retryInterval: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL'], positive: true) retryInterval: checkInt(conf['RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL'], positive: true)
deltaSource
} }
dockerUtils.rsyncImageWithProgress(app.imageId, deltaOpts, onProgress) dockerUtils.rsyncImageWithProgress(app.imageId, deltaOpts, onProgress)
else else
@ -555,9 +556,9 @@ updateStatus =
intervalHandle: null intervalHandle: null
updateStrategies = updateStrategies =
'download-then-kill': ({ localApp, app, needsDownload, force }) -> 'download-then-kill': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.try -> Promise.try ->
fetch(app) if needsDownload fetch(app, { deltaSource }) if needsDownload
.then -> .then ->
Promise.using lockUpdates(localApp, force), -> Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
@ -568,19 +569,19 @@ updateStrategies =
.catch (err) -> .catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err throw err
'kill-then-download': ({ localApp, app, needsDownload, force }) -> 'kill-then-download': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.using lockUpdates(localApp, force), -> Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
utils.getKnexApp(localApp.appId) utils.getKnexApp(localApp.appId)
.then(kill) .then(kill)
.then -> .then ->
fetch(app) if needsDownload fetch(app, { deltaSource }) if needsDownload
.then -> .then ->
start(app) start(app)
.catch (err) -> .catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err throw err
'delete-then-download': ({ localApp, app, needsDownload, force }) -> 'delete-then-download': ({ localApp, app, needsDownload, force, deltaSource }) ->
Promise.using lockUpdates(localApp, force), -> Promise.using lockUpdates(localApp, force), ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
utils.getKnexApp(localApp.appId) utils.getKnexApp(localApp.appId)
@ -591,18 +592,18 @@ updateStrategies =
if needsDownload if needsDownload
deleteImage(appFromDB) deleteImage(appFromDB)
.then -> .then ->
fetch(app) fetch(app, { deltaSource })
.then -> .then ->
start(app) start(app)
.catch (err) -> .catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError logSystemEvent(logTypes.updateAppError, app, err) unless err instanceof UpdatesLockedError
throw err throw err
'hand-over': ({ localApp, app, needsDownload, force, timeout }) -> 'hand-over': ({ localApp, app, needsDownload, force, timeout, deltaSource }) ->
Promise.using lockUpdates(localApp, force), -> Promise.using lockUpdates(localApp, force), ->
utils.getKnexApp(localApp.appId) utils.getKnexApp(localApp.appId)
.then (localApp) -> .then (localApp) ->
Promise.try -> Promise.try ->
fetch(app) if needsDownload fetch(app, { deltaSource }) if needsDownload
.then -> .then ->
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
start(app) start(app)
@ -647,13 +648,16 @@ parseEnvAndFormatRemoteApps = (remoteApps, uuid, apiKey) ->
env: JSON.stringify(env) env: JSON.stringify(env)
config: JSON.stringify(app.config) config: JSON.stringify(app.config)
name: app.name name: app.name
markedForDeletion: false
} }
Promise.props(appsWithEnv) Promise.props(appsWithEnv)
formatLocalApps = (apps) -> formatLocalApps = (apps) ->
apps = _.keyBy(apps, 'appId') apps = _.keyBy(apps, 'appId')
localApps = _.mapValues apps, (app) -> localApps = _.mapValues apps, (app) ->
app = _.pick(app, [ 'appId', 'commit', 'imageId', 'env', 'config', 'name' ]) app = _.pick(app, [ 'appId', 'commit', 'imageId', 'env', 'config', 'name', 'markedForDeletion' ])
app.markedForDeletion = checkTruthy(app.markedForDeletion)
return app
return localApps return localApps
restartVars = (conf) -> restartVars = (conf) ->
@ -682,6 +686,9 @@ compareForUpdate = (localApps, remoteApps) ->
toBeDownloaded = _.filter toBeUpdated, (appId) -> toBeDownloaded = _.filter toBeUpdated, (appId) ->
return !_.isEqual(remoteApps[appId].imageId, localApps[appId].imageId) return !_.isEqual(remoteApps[appId].imageId, localApps[appId].imageId)
toBeDownloaded = _.union(toBeDownloaded, toBeInstalled) toBeDownloaded = _.union(toBeDownloaded, toBeInstalled)
# The order in this _.union ensures that installations (new appIds) go last, so
# they will happen *after* removals.
allAppIds = _.union(localAppIds, remoteAppIds) allAppIds = _.union(localAppIds, remoteAppIds)
return { toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds } return { toBeRemoved, toBeDownloaded, toBeInstalled, toBeUpdated, appsWithUpdatedConfigs, remoteAppIds, allAppIds }
@ -713,13 +720,6 @@ application.update = update = (force, scheduled = false) ->
utils.setConfig('name', local.name) if local.name != deviceName utils.setConfig('name', local.name) if local.name != deviceName
.then -> .then ->
parseEnvAndFormatRemoteApps(local.apps, uuid, apiKey) parseEnvAndFormatRemoteApps(local.apps, uuid, apiKey)
.tap (remoteApps) ->
# Before running the updates, try to clean up any images that aren't in use
# and will not be used in the target state
return if application.localMode or device.shuttingDown
dockerUtils.cleanupContainersAndImages(_.map(remoteApps, 'imageId'))
.catch (err) ->
console.log('Cleanup failed: ', err, err.stack)
.then (remoteApps) -> .then (remoteApps) ->
localApps = formatLocalApps(apps) localApps = formatLocalApps(apps)
resourcesForUpdate = compareForUpdate(localApps, remoteApps) resourcesForUpdate = compareForUpdate(localApps, remoteApps)
@ -727,8 +727,21 @@ application.update = update = (force, scheduled = false) ->
if !_.isEmpty(toBeRemoved) or !_.isEmpty(toBeInstalled) or !_.isEmpty(toBeUpdated) if !_.isEmpty(toBeRemoved) or !_.isEmpty(toBeInstalled) or !_.isEmpty(toBeUpdated)
device.setUpdateState(update_pending: true) device.setUpdateState(update_pending: true)
# Run special functions against variables deltaSource = {}
Promise.try ->
Promise.map toBeDownloaded, (appId) ->
if _.includes(toBeUpdated, appId)
deltaSource[appId] = localApps[appId].imageId
else if !_.isEmpty(apps) # apps is localApps as an array
deltaSource[appId] = apps[0].imageId
.then ->
# Before running the updates, try to clean up any images that aren't in use
# and will not be used in the target state
return if application.localMode or device.shuttingDown
dockerUtils.cleanupContainersAndImages(_.map(remoteApps, 'imageId').concat(_.values(deltaSource)))
.catch (err) ->
console.log('Cleanup failed: ', err, err.stack)
.then ->
remoteDeviceConfig = {} remoteDeviceConfig = {}
_.map remoteAppIds, (appId) -> _.map remoteAppIds, (appId) ->
_.merge(remoteDeviceConfig, JSON.parse(remoteApps[appId].config)) _.merge(remoteDeviceConfig, JSON.parse(remoteApps[appId].config))
@ -743,7 +756,9 @@ application.update = update = (force, scheduled = false) ->
.catch (err) -> .catch (err) ->
logSystemMessage("Error fetching/applying device configuration: #{err}", { error: err }, 'Set device configuration error') logSystemMessage("Error fetching/applying device configuration: #{err}", { error: err }, 'Set device configuration error')
.return(allAppIds) .return(allAppIds)
.map (appId) -> .mapSeries (appId) ->
# The sorting of allAppIds from compareForUpdate ensures that toBeRemoved is
# done before toBeInstalled, which is the behavior we want.
return if application.localMode or device.shuttingDown return if application.localMode or device.shuttingDown
Promise.try -> Promise.try ->
needsDownload = _.includes(toBeDownloaded, appId) needsDownload = _.includes(toBeDownloaded, appId)
@ -755,14 +770,14 @@ application.update = update = (force, scheduled = false) ->
utils.getKnexApp(appId) utils.getKnexApp(appId)
.then(kill) .then(kill)
.then -> .then ->
knex('app').where('appId', appId).delete() knex('app').where('appId', appId).update(markedForDeletion: true)
.catch (err) -> .catch (err) ->
logSystemEvent(logTypes.updateAppError, app, err) logSystemEvent(logTypes.updateAppError, app, err)
throw err throw err
else if _.includes(toBeInstalled, appId) else if _.includes(toBeInstalled, appId)
app = remoteApps[appId] app = remoteApps[appId]
Promise.try -> Promise.try ->
fetch(app) if needsDownload fetch(app, { deltaSource: deltaSource[appId] }) if needsDownload
.then -> .then ->
start(app) start(app)
else if _.includes(toBeUpdated, appId) else if _.includes(toBeUpdated, appId)
@ -779,6 +794,7 @@ application.update = update = (force, scheduled = false) ->
needsDownload needsDownload
force: force || forceThisApp force: force || forceThisApp
timeout timeout
deltaSource: deltaSource[appId]
} }
else if _.includes(appsWithUpdatedConfigs, appId) else if _.includes(appsWithUpdatedConfigs, appId)
# These apps have no changes other than config variables. # These apps have no changes other than config variables.
@ -806,7 +822,9 @@ application.update = update = (force, scheduled = false) ->
device.setUpdateState(update_pending: false, update_downloaded: false, update_failed: false) device.setUpdateState(update_pending: false, update_downloaded: false, update_failed: false)
# We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a # We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a
# point where we might clean up an image we still want. # point where we might clean up an image we still want.
dockerUtils.cleanupContainersAndImages() knex('app').where(markedForDeletion: true).del()
.then ->
dockerUtils.cleanupContainersAndImages()
.catch (err) -> .catch (err) ->
updateStatus.failed++ updateStatus.failed++
device.setUpdateState(update_failed: true) device.setUpdateState(update_failed: true)
@ -865,7 +883,7 @@ application.initialize = ->
listenToEvents() listenToEvents()
getAndApplyDeviceConfig() getAndApplyDeviceConfig()
.then -> .then ->
knex('app').select() knex('app').whereNot(markedForDeletion: true).select()
.map (app) -> .map (app) ->
unlockAndStart(app) if !application.localMode and !device.shuttingDown unlockAndStart(app) if !application.localMode and !device.shuttingDown
.catch (error) -> .catch (error) ->

View File

@ -5,6 +5,7 @@ knex = Knex(
client: 'sqlite3' client: 'sqlite3'
connection: connection:
filename: '/data/database.sqlite' filename: '/data/database.sqlite'
useNullAsDefault: true
) )
addColumn = (table, column, type) -> addColumn = (table, column, type) ->
@ -46,15 +47,19 @@ knex.init = Promise.all([
t.boolean('privileged') t.boolean('privileged')
t.json('env') t.json('env')
t.json('config') t.json('config')
t.boolean('markedForDeletion')
else else
Promise.all [ Promise.all [
addColumn('app', 'commit', 'string') addColumn('app', 'commit', 'string')
addColumn('app', 'appId', 'string') addColumn('app', 'appId', 'string')
addColumn('app', 'config', 'json') addColumn('app', 'config', 'json')
addColumn('app', 'markedForDeletion', 'boolean')
] ]
.then -> .then ->
# When updating from older supervisors, config can be null # When updating from older supervisors, config can be null
knex('app').update({ config: '{}' }).whereNull('config') knex('app').update({ config: '{}' }).whereNull('config')
.then ->
knex('app').update({ markedForDeletion: false }).whereNull('markedForDeletion')
knex.schema.hasTable('dependentApp') knex.schema.hasTable('dependentApp')
.then (exists) -> .then (exists) ->

View File

@ -10,39 +10,10 @@ _ = require 'lodash'
knex = require './db' knex = require './db'
{ request, resumable } = require './request' { request, resumable } = require './request'
Lock = require 'rwlock' Lock = require 'rwlock'
utils = require './utils'
rimraf = Promise.promisify(require('rimraf'))
exports.docker = docker = new Docker() exports.docker = docker = new Docker()
dockerProgress = new DockerProgress(dockerToolbelt: docker) dockerProgress = new DockerProgress(dockerToolbelt: docker)
# Create an array of (repoTag, image_id, created) tuples like the output of `docker images`
listRepoTags = ->
docker.listImages()
.then (images) ->
images = _.orderBy(images, 'Created', [ false ])
ret = []
for image in images
for repoTag in image.RepoTags
ret.push [ repoTag, image.Id, image.Created ]
return ret
# Find either the most recent image of the same app or the image of the supervisor.
# Returns an image Id or Tag (depending on whatever's available)
findSimilarImage = (repoTag) ->
application = repoTag.split('/')[1]
listRepoTags()
.then (repoTags) ->
# Find the most recent image of the same application
for repoTag in repoTags
otherApplication = repoTag[0].split('/')[1]
if otherApplication is application
return repoTag[0]
# Otherwise we start from scratch
return 'resin/scratch'
getRepoAndTag = (image) -> getRepoAndTag = (image) ->
docker.getRegistryAndName(image) docker.getRegistryAndName(image)
.then ({ registry, imageName, tagName }) -> .then ({ registry, imageName, tagName }) ->
@ -82,13 +53,21 @@ do ->
.disposer (release) -> .disposer (release) ->
release() release()
exports.rsyncImageWithProgress = (imgDest, { requestTimeout, applyTimeout, retryCount, retryInterval, uuid, apiKey, startFromEmpty = false }, onProgress) -> exports.rsyncImageWithProgress = (imgDest, opts, onProgress) ->
{ requestTimeout, applyTimeout, retryCount, retryInterval, uuid, apiKey, deltaSource, startFromEmpty = false } = opts
Promise.using readLockImages(), -> Promise.using readLockImages(), ->
Promise.try -> Promise.try ->
if startFromEmpty if startFromEmpty or !deltaSource?
return 'resin/scratch' return 'resin/scratch'
findSimilarImage(imgDest) else
docker.getImage(deltaSource).inspect()
.then ->
return deltaSource
.catch ->
return 'resin/scratch'
.then (imgSrc) -> .then (imgSrc) ->
# I'll leave this debug log here in case we ever wonder what delta source a device is using in production
console.log("Using delta source #{imgSrc}")
Promise.join docker.getRegistryAndName(imgDest), docker.getRegistryAndName(imgSrc), (dstInfo, srcInfo) -> Promise.join docker.getRegistryAndName(imgDest), docker.getRegistryAndName(imgSrc), (dstInfo, srcInfo) ->
tokenEndpoint = "#{config.apiEndpoint}/auth/v1/token" tokenEndpoint = "#{config.apiEndpoint}/auth/v1/token"
opts = opts =
@ -132,8 +111,10 @@ do ->
.then ({ repo, tag }) -> .then ({ repo, tag }) ->
docker.getImage(id).tag({ repo, tag, force: true }) docker.getImage(id).tag({ repo, tag, force: true })
.catch dockerDelta.OutOfSyncError, (err) -> .catch dockerDelta.OutOfSyncError, (err) ->
throw err if startFromEmpty
console.log('Falling back to delta-from-empty') console.log('Falling back to delta-from-empty')
exports.rsyncImageWithProgress(imgDest, { requestTimeout, totalTimeout, uuid, apiKey, startFromEmpty: true }, onProgress) opts.startFromEmpty = true
exports.rsyncImageWithProgress(imgDest, opts, onProgress)
exports.fetchImageWithProgress = (image, onProgress, { uuid, apiKey }) -> exports.fetchImageWithProgress = (image, onProgress, { uuid, apiKey }) ->
Promise.using readLockImages(), -> Promise.using readLockImages(), ->

View File

@ -17,8 +17,8 @@ exports.checkString = (s) ->
return s return s
exports.checkTruthy = (v) -> exports.checkTruthy = (v) ->
if v == '1' or v == 'true' or v == true or v == 'on' if v in [ '1', 'true', true, 'on', 1 ]
return true return true
if v == '0' or v == 'false' or v == false or v == 'off' if v in [ '0', 'false', false, 'off', 0 ]
return false return false
return return

View File

@ -243,7 +243,12 @@ exports.fetchAndSetTargetsForDependentApps = (state, fetchFn, apiKey) ->
return app.commit? and !_.some(remoteApps, imageId: app.imageId) return app.commit? and !_.some(remoteApps, imageId: app.imageId)
toBeDeletedFromDB = _(localApps).reject((app, appId) -> remoteApps[appId]?).map('appId').value() toBeDeletedFromDB = _(localApps).reject((app, appId) -> remoteApps[appId]?).map('appId').value()
Promise.map toBeDownloaded, (app) -> Promise.map toBeDownloaded, (app) ->
fetchFn(app, false) deltaSource = null
if localApps[app.appId]?
deltaSource = localApps[app.appId].imageId
else if !_.isEmpty(localDependentApps)
deltaSource = localDependentApps[0].imageId
fetchFn(app, { deltaSource, setDeviceUpdateState: false })
.then -> .then ->
Promise.map toBeRemoved, (app) -> Promise.map toBeRemoved, (app) ->
fs.unlinkAsync(tarPath(app)) fs.unlinkAsync(tarPath(app))