mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2024-12-23 15:32:24 +00:00
Updates by appId and use Promise.using for lock
Change the update cycle to go by appId instead of imageId. Use Promise.using for lockFile locks and unlocks. Now updates shouldn't stop if one of the apps fails to update (it's a step towards better supporting multiple apps). Forcing the lock now works. Remove unnecessary require fs Nicer assignment for s in joinErrorMessages
This commit is contained in:
parent
1eb31ce5f6
commit
d1b317399e
@ -71,16 +71,16 @@ module.exports = (secret) ->
|
|||||||
.then ([ app ]) ->
|
.then ([ app ]) ->
|
||||||
if !app?
|
if !app?
|
||||||
throw new Error('App not found')
|
throw new Error('App not found')
|
||||||
Promise.using application.lockUpdates(), ->
|
Promise.using application.lockUpdates(app), ->
|
||||||
application.lockAndKill(app)
|
application.kill(app)
|
||||||
.then ->
|
.then ->
|
||||||
new Promise (resolve, reject) ->
|
new Promise (resolve, reject) ->
|
||||||
request.post(config.gosuperAddress + '/v1/purge', { json: true, body: applicationId: appId })
|
request.post(config.gosuperAddress + '/v1/purge', { json: true, body: applicationId: appId.toString() })
|
||||||
.on 'error', reject
|
.on 'error', reject
|
||||||
.on 'response', -> resolve()
|
.on 'response', -> resolve()
|
||||||
.pipe(res)
|
.pipe(res)
|
||||||
.finally ->
|
.finally ->
|
||||||
application.startAndUnlock(app)
|
application.start(app)
|
||||||
.catch (err) ->
|
.catch (err) ->
|
||||||
res.status(503).send(err?.message or err or 'Unknown error')
|
res.status(503).send(err?.message or err or 'Unknown error')
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ knex.init.then ->
|
|||||||
console.log('Starting Apps..')
|
console.log('Starting Apps..')
|
||||||
knex('app').select()
|
knex('app').select()
|
||||||
.then (apps) ->
|
.then (apps) ->
|
||||||
Promise.all(apps.map(application.startAndUnlock))
|
Promise.all(apps.map(application.unlockAndStart))
|
||||||
.catch (error) ->
|
.catch (error) ->
|
||||||
console.error('Error starting apps:', error)
|
console.error('Error starting apps:', error)
|
||||||
.then ->
|
.then ->
|
||||||
|
@ -12,7 +12,6 @@ logger = require './lib/logger'
|
|||||||
{ cachedResinApi } = require './request'
|
{ cachedResinApi } = require './request'
|
||||||
device = require './device'
|
device = require './device'
|
||||||
lockFile = Promise.promisifyAll(require('lockfile'))
|
lockFile = Promise.promisifyAll(require('lockfile'))
|
||||||
fs = Promise.promisifyAll(require('fs'))
|
|
||||||
|
|
||||||
{ docker } = dockerUtils
|
{ docker } = dockerUtils
|
||||||
|
|
||||||
@ -110,9 +109,6 @@ exports.kill = kill = (app) ->
|
|||||||
throw err
|
throw err
|
||||||
.tap ->
|
.tap ->
|
||||||
logSystemEvent(logTypes.stopAppSuccess, app)
|
logSystemEvent(logTypes.stopAppSuccess, app)
|
||||||
knex('app').where('id', app.id).delete()
|
|
||||||
.tap ->
|
|
||||||
app.id = null
|
|
||||||
.catch (err) ->
|
.catch (err) ->
|
||||||
logSystemEvent(logTypes.stopAppError, app, err)
|
logSystemEvent(logTypes.stopAppError, app, err)
|
||||||
throw err
|
throw err
|
||||||
@ -192,10 +188,9 @@ exports.start = start = (app) ->
|
|||||||
# Update the app info the moment we create the container, even if then starting the container fails. This
|
# Update the app info the moment we create the container, even if then starting the container fails. This
|
||||||
# stops issues with constantly creating new containers for an image that fails to start.
|
# stops issues with constantly creating new containers for an image that fails to start.
|
||||||
app.containerId = container.id
|
app.containerId = container.id
|
||||||
if app.id?
|
knex('app').update(app).where(appId: app.appId)
|
||||||
knex('app').update(app).where(id: app.id)
|
.then (affectedRows) ->
|
||||||
else
|
knex('app').insert(app) if affectedRows == 0
|
||||||
knex('app').insert(app)
|
|
||||||
.tap (container) ->
|
.tap (container) ->
|
||||||
logSystemEvent(logTypes.startApp, app)
|
logSystemEvent(logTypes.startApp, app)
|
||||||
device.updateState(status: 'Starting')
|
device.updateState(status: 'Starting')
|
||||||
@ -246,52 +241,47 @@ getEnvironment = do ->
|
|||||||
console.error("Failed to get environment for device #{deviceId}, app #{appId}. #{err}")
|
console.error("Failed to get environment for device #{deviceId}, app #{appId}. #{err}")
|
||||||
throw err
|
throw err
|
||||||
|
|
||||||
lockPath = (app) -> "/mnt/root/resin-data/#{app.appId}/resin_updates.lock"
|
lockPath = (app) ->
|
||||||
locked = {}
|
appId = app.appId or app
|
||||||
exports.lockAndKill = lockAndKill = (app, force) ->
|
return "/mnt/root/resin-data/#{appId}/resin_updates.lock"
|
||||||
Promise.try ->
|
|
||||||
fs.unlinkAsync(lockPath(app)) if force == true
|
|
||||||
.then ->
|
|
||||||
locked[app.appId] = true
|
|
||||||
lockFile.lockAsync(lockPath(app))
|
|
||||||
.catch (err) ->
|
|
||||||
if err.code != 'ENOENT'
|
|
||||||
locked[app.appId] = false
|
|
||||||
err = new Error('Updates are locked by application')
|
|
||||||
logSystemEvent(logTypes.stopAppError, app, err)
|
|
||||||
throw err
|
|
||||||
.then ->
|
|
||||||
kill(app)
|
|
||||||
|
|
||||||
exports.startAndUnlock = startAndUnlock = (app) ->
|
# At boot, all apps should be unlocked *before* start to prevent a deadlock
|
||||||
Promise.try ->
|
exports.unlockAndStart = unlockAndStart = (app) ->
|
||||||
throw new Error("Cannot start app because we couldn't acquire lock") if locked[app.appId] == false
|
lockFile.unlockAsync(lockPath(app))
|
||||||
.then ->
|
.then ->
|
||||||
locked[app.appId] = null
|
|
||||||
start(app)
|
start(app)
|
||||||
|
|
||||||
|
exports.lockUpdates = lockUpdates = (app, force) ->
|
||||||
|
Promise.try ->
|
||||||
|
lockFile.unlockAsync(lockPath(app)) if force == true
|
||||||
.then ->
|
.then ->
|
||||||
|
lockFile.lockAsync(lockPath(app))
|
||||||
|
.disposer ->
|
||||||
lockFile.unlockAsync(lockPath(app))
|
lockFile.unlockAsync(lockPath(app))
|
||||||
|
|
||||||
exports.lockUpdates = lockUpdates = do ->
|
joinErrorMessages = (failures) ->
|
||||||
_lock = new Lock()
|
s = if failures.length > 1 then 's' else ''
|
||||||
_lockUpdates = Promise.promisify(_lock.async.writeLock)
|
messages = _.map failures, (err) ->
|
||||||
return -> _lockUpdates().disposer (release) -> release()
|
err.message or err
|
||||||
|
"#{failures.length} error#{s}: #{messages.join(' - ')}"
|
||||||
|
|
||||||
# 0 - Idle
|
# 0 - Idle
|
||||||
# 1 - Updating
|
# 1 - Updating
|
||||||
# 2 - Update required
|
# 2 - Update required
|
||||||
currentlyUpdating = 0
|
currentlyUpdating = 0
|
||||||
failedUpdates = 0
|
failedUpdates = 0
|
||||||
|
forceNextUpdate = false
|
||||||
exports.update = update = (force) ->
|
exports.update = update = (force) ->
|
||||||
if currentlyUpdating isnt 0
|
if currentlyUpdating isnt 0
|
||||||
# Mark an update required after the current.
|
# Mark an update required after the current.
|
||||||
|
forceNextUpdate = force
|
||||||
currentlyUpdating = 2
|
currentlyUpdating = 2
|
||||||
return
|
return
|
||||||
currentlyUpdating = 1
|
currentlyUpdating = 1
|
||||||
Promise.all([
|
Promise.all([
|
||||||
knex('config').select('value').where(key: 'apiKey')
|
knex('config').select('value').where(key: 'apiKey')
|
||||||
knex('config').select('value').where(key: 'uuid')
|
knex('config').select('value').where(key: 'uuid')
|
||||||
Promise.using(lockUpdates(), -> knex('app').select())
|
knex('app').select()
|
||||||
])
|
])
|
||||||
.then ([ [ apiKey ], [ uuid ], apps ]) ->
|
.then ([ [ apiKey ], [ uuid ], apps ]) ->
|
||||||
apiKey = apiKey.value
|
apiKey = apiKey.value
|
||||||
@ -336,44 +326,60 @@ exports.update = update = (force) ->
|
|||||||
env: JSON.stringify(env) # The env has to be stored as a JSON string for knex
|
env: JSON.stringify(env) # The env has to be stored as a JSON string for knex
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteApps = _.indexBy(remoteApps, 'imageId')
|
remoteApps = _.indexBy(remoteApps, 'appId')
|
||||||
remoteImages = _.keys(remoteApps)
|
remoteAppIds = _.keys(remoteApps)
|
||||||
|
|
||||||
apps = _.indexBy(apps, 'imageId')
|
apps = _.indexBy(apps, 'appId')
|
||||||
localApps = _.mapValues apps, (app) ->
|
localApps = _.mapValues apps, (app) ->
|
||||||
_.pick(app, [ 'appId', 'commit', 'imageId', 'env' ])
|
_.pick(app, [ 'appId', 'commit', 'imageId', 'env' ])
|
||||||
localImages = _.keys(localApps)
|
localAppIds = _.keys(localApps)
|
||||||
|
|
||||||
toBeRemoved = _.difference(localImages, remoteImages)
|
toBeRemoved = _.difference(localAppIds, remoteAppIds)
|
||||||
toBeInstalled = _.difference(remoteImages, localImages)
|
toBeInstalled = _.difference(remoteAppIds, localAppIds)
|
||||||
|
|
||||||
toBeUpdated = _.intersection(remoteImages, localImages)
|
toBeUpdated = _.intersection(remoteAppIds, localAppIds)
|
||||||
toBeUpdated = _.filter toBeUpdated, (imageId) ->
|
toBeUpdated = _.filter toBeUpdated, (appId) ->
|
||||||
return !_.isEqual(remoteApps[imageId], localApps[imageId])
|
return !_.isEqual(remoteApps[appId], localApps[appId])
|
||||||
|
|
||||||
|
toBeDownloaded = _.filter toBeUpdated, (appId) ->
|
||||||
|
return !_.isEqual(remoteApps[appId].imageId, localApps[appId].imageId)
|
||||||
|
toBeDownloaded = _.union(toBeDownloaded, toBeInstalled)
|
||||||
|
|
||||||
# Fetch any updated images first
|
# Fetch any updated images first
|
||||||
Promise.map toBeInstalled, (imageId) ->
|
Promise.map toBeDownloaded, (appId) ->
|
||||||
app = remoteApps[imageId]
|
app = remoteApps[appId]
|
||||||
fetch(app)
|
fetch(app)
|
||||||
.then ->
|
.then ->
|
||||||
Promise.using lockUpdates(), ->
|
failures = []
|
||||||
# Then delete all the ones to remove in one go
|
# Then delete all the ones to remove in one go
|
||||||
Promise.map toBeRemoved, (imageId) ->
|
Promise.map toBeRemoved, (appId) ->
|
||||||
lockAndKill(apps[imageId], force)
|
Promise.using lockUpdates(apps[appId], force), ->
|
||||||
.then ->
|
kill(apps[appId])
|
||||||
# Then install the apps and add each to the db as they succeed
|
.then ->
|
||||||
installingPromises = toBeInstalled.map (imageId) ->
|
knex('app').where('appId', appId).delete()
|
||||||
app = remoteApps[imageId]
|
.catch (err) ->
|
||||||
startAndUnlock(app)
|
failures.push(err)
|
||||||
# And remove/recreate updated apps and update db as they succeed
|
.then ->
|
||||||
updatingPromises = toBeUpdated.map (imageId) ->
|
# Then install the apps and add each to the db as they succeed
|
||||||
localApp = apps[imageId]
|
installingPromises = toBeInstalled.map (imageId) ->
|
||||||
app = remoteApps[imageId]
|
app = remoteApps[imageId]
|
||||||
logSystemEvent(logTypes.updateApp, app)
|
start(app)
|
||||||
lockAndKill(localApp, force)
|
.catch (err) ->
|
||||||
|
failures.push(err)
|
||||||
|
# And remove/recreate updated apps and update db as they succeed
|
||||||
|
updatingPromises = toBeUpdated.map (appId) ->
|
||||||
|
localApp = apps[appId]
|
||||||
|
app = remoteApps[appId]
|
||||||
|
logSystemEvent(logTypes.updateApp, app) if localApp.imageId == app.imageId
|
||||||
|
Promise.using lockUpdates(localApp, force), ->
|
||||||
|
kill(localApp)
|
||||||
.then ->
|
.then ->
|
||||||
startAndUnlock(app)
|
start(app)
|
||||||
Promise.all(installingPromises.concat(updatingPromises))
|
.catch (err) ->
|
||||||
|
failures.push(err)
|
||||||
|
Promise.all(installingPromises.concat(updatingPromises))
|
||||||
|
.then ->
|
||||||
|
throw new Error(joinErrorMessages(failures)) if failures.length > 0
|
||||||
.then ->
|
.then ->
|
||||||
failedUpdates = 0
|
failedUpdates = 0
|
||||||
# We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a
|
# We cleanup here as we want a point when we have a consistent apps/images state, rather than potentially at a
|
||||||
@ -387,12 +393,12 @@ exports.update = update = (force) ->
|
|||||||
delayTime = Math.min(failedUpdates * 500, 30000)
|
delayTime = Math.min(failedUpdates * 500, 30000)
|
||||||
# If there was an error then schedule another attempt briefly in the future.
|
# If there was an error then schedule another attempt briefly in the future.
|
||||||
console.log('Scheduling another update attempt due to failure: ', delayTime, err)
|
console.log('Scheduling another update attempt due to failure: ', delayTime, err)
|
||||||
setTimeout(update, delayTime)
|
setTimeout(update, delayTime, force)
|
||||||
.finally ->
|
.finally ->
|
||||||
device.updateState(status: 'Idle')
|
device.updateState(status: 'Idle')
|
||||||
if currentlyUpdating is 2
|
if currentlyUpdating is 2
|
||||||
# If an update is required then schedule it
|
# If an update is required then schedule it
|
||||||
setTimeout(update)
|
setTimeout(update, 1, forceNextUpdate)
|
||||||
.finally ->
|
.finally ->
|
||||||
# Set the updating as finished in its own block, so it never has to worry about other code stopping this.
|
# Set the updating as finished in its own block, so it never has to worry about other code stopping this.
|
||||||
currentlyUpdating = 0
|
currentlyUpdating = 0
|
||||||
|
Loading…
Reference in New Issue
Block a user