mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-02-23 02:32:43 +00:00
Merge pull request #877 from balena-io/typescript
Typescript conversions
This commit is contained in:
commit
2a105723b4
@ -8,14 +8,14 @@ path = require 'path'
|
|||||||
|
|
||||||
constants = require './lib/constants'
|
constants = require './lib/constants'
|
||||||
|
|
||||||
Docker = require './lib/docker-utils'
|
{ DockerUtils: Docker } = require './lib/docker-utils'
|
||||||
{ LocalModeManager } = require './local-mode'
|
{ LocalModeManager } = require './local-mode'
|
||||||
updateLock = require './lib/update-lock'
|
updateLock = require './lib/update-lock'
|
||||||
{ checkTruthy, checkInt, checkString } = require './lib/validation'
|
{ checkTruthy, checkInt, checkString } = require './lib/validation'
|
||||||
{ NotFoundError } = require './lib/errors'
|
{ NotFoundError } = require './lib/errors'
|
||||||
{ pathExistsOnHost } = require './lib/fs-utils'
|
{ pathExistsOnHost } = require './lib/fs-utils'
|
||||||
|
|
||||||
ServiceManager = require './compose/service-manager'
|
{ ServiceManager } = require './compose/service-manager'
|
||||||
{ Service } = require './compose/service'
|
{ Service } = require './compose/service'
|
||||||
{ Images } = require './compose/images'
|
{ Images } = require './compose/images'
|
||||||
{ NetworkManager } = require './compose/network-manager'
|
{ NetworkManager } = require './compose/network-manager'
|
||||||
@ -689,8 +689,7 @@ module.exports = class ApplicationManager extends EventEmitter
|
|||||||
normaliseAndExtendAppFromDB: (app) =>
|
normaliseAndExtendAppFromDB: (app) =>
|
||||||
Promise.join(
|
Promise.join(
|
||||||
@config.get('extendedEnvOptions')
|
@config.get('extendedEnvOptions')
|
||||||
@docker.getNetworkGateway(constants.supervisorNetworkInterface)
|
@docker.getNetworkGateway(constants.supervisorNetworkInterface).catch(-> '127.0.0.1')
|
||||||
.catchReturn('127.0.0.1')
|
|
||||||
Promise.props({
|
Promise.props({
|
||||||
firmware: pathExistsOnHost('/lib/firmware')
|
firmware: pathExistsOnHost('/lib/firmware')
|
||||||
modules: pathExistsOnHost('/lib/modules')
|
modules: pathExistsOnHost('/lib/modules')
|
||||||
|
2
src/application-manager.d.ts
vendored
2
src/application-manager.d.ts
vendored
@ -7,7 +7,7 @@ import { Logger } from './logger';
|
|||||||
import { EventTracker } from './event-tracker';
|
import { EventTracker } from './event-tracker';
|
||||||
|
|
||||||
import Images from './compose/images';
|
import Images from './compose/images';
|
||||||
import ServiceManager = require('./compose/service-manager');
|
import ServiceManager from './compose/service-manager';
|
||||||
import DB from './db';
|
import DB from './db';
|
||||||
|
|
||||||
import { Service } from './compose/service';
|
import { Service } from './compose/service';
|
||||||
|
@ -4,10 +4,13 @@ import { EventEmitter } from 'events';
|
|||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
import StrictEventEmitter from 'strict-event-emitter-types';
|
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||||
|
|
||||||
import { SchemaReturn } from '../config/schema-type';
|
|
||||||
import Database from '../db';
|
import Database from '../db';
|
||||||
import * as constants from '../lib/constants';
|
import * as constants from '../lib/constants';
|
||||||
import DockerUtils = require('../lib/docker-utils');
|
import {
|
||||||
|
DeltaFetchOptions,
|
||||||
|
DockerUtils,
|
||||||
|
FetchOptions,
|
||||||
|
} from '../lib/docker-utils';
|
||||||
import { DeltaStillProcessingError, NotFoundError } from '../lib/errors';
|
import { DeltaStillProcessingError, NotFoundError } from '../lib/errors';
|
||||||
import * as LogTypes from '../lib/log-types';
|
import * as LogTypes from '../lib/log-types';
|
||||||
import * as validation from '../lib/validation';
|
import * as validation from '../lib/validation';
|
||||||
@ -29,25 +32,6 @@ interface FetchProgressEvent {
|
|||||||
percentage: number;
|
percentage: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This is copied from src/lib/docker-utils.d.ts but because of the
|
|
||||||
// export mechanism used, we can't export it. Once we convert docker-utils
|
|
||||||
// to typescript, remove this
|
|
||||||
interface DeltaFetchOptions {
|
|
||||||
deltaRequestTimeout: number;
|
|
||||||
deltaApplyTimeout: number;
|
|
||||||
deltaRetryCount: number;
|
|
||||||
deltaRetryInterval: number;
|
|
||||||
uuid: string;
|
|
||||||
currentApiKey: string;
|
|
||||||
deltaEndpoint: string;
|
|
||||||
apiEndpoint: string;
|
|
||||||
deltaSource: string;
|
|
||||||
deltaSourceId: string;
|
|
||||||
deltaVersion: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
type FetchOptions = SchemaReturn<'fetchOptions'> & { deltaSource?: string };
|
|
||||||
|
|
||||||
export interface Image {
|
export interface Image {
|
||||||
id: number;
|
id: number;
|
||||||
// image registry/repo@digest or registry/repo:tag
|
// image registry/repo@digest or registry/repo:tag
|
||||||
@ -133,7 +117,7 @@ export class Images extends (EventEmitter as {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
let id;
|
let id;
|
||||||
if (opts.delta && opts.deltaSource != null) {
|
if (opts.delta && (opts as DeltaFetchOptions).deltaSource != null) {
|
||||||
id = await this.fetchDelta(image, opts, onProgress);
|
id = await this.fetchDelta(image, opts, onProgress);
|
||||||
} else {
|
} else {
|
||||||
id = await this.fetchImage(image, opts, onProgress);
|
id = await this.fetchImage(image, opts, onProgress);
|
||||||
@ -598,7 +582,7 @@ export class Images extends (EventEmitter as {
|
|||||||
image: Image,
|
image: Image,
|
||||||
opts: FetchOptions,
|
opts: FetchOptions,
|
||||||
onProgress: (evt: FetchProgressEvent) => void,
|
onProgress: (evt: FetchProgressEvent) => void,
|
||||||
): Bluebird<string> {
|
): Promise<string> {
|
||||||
this.logger.logSystemEvent(LogTypes.downloadImage, { image });
|
this.logger.logSystemEvent(LogTypes.downloadImage, { image });
|
||||||
return this.docker.fetchImageWithProgress(image.name, opts, onProgress);
|
return this.docker.fetchImageWithProgress(image.name, opts, onProgress);
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import * as Bluebird from 'bluebird';
|
import * as Bluebird from 'bluebird';
|
||||||
import { fs } from 'mz';
|
|
||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
|
import { fs } from 'mz';
|
||||||
|
|
||||||
import * as constants from '../lib/constants';
|
import * as constants from '../lib/constants';
|
||||||
import Docker = require('../lib/docker-utils');
|
import Docker from '../lib/docker-utils';
|
||||||
import { ENOENT, NotFoundError } from '../lib/errors';
|
import { ENOENT, NotFoundError } from '../lib/errors';
|
||||||
import { Logger } from '../logger';
|
import { Logger } from '../logger';
|
||||||
import { Network, NetworkOptions } from './network';
|
import { Network, NetworkOptions } from './network';
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import * as Bluebird from 'bluebird';
|
import * as Bluebird from 'bluebird';
|
||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
|
|
||||||
import Docker = require('../lib/docker-utils');
|
import Docker from '../lib/docker-utils';
|
||||||
import { InvalidAppIdError, NotFoundError } from '../lib/errors';
|
import { InvalidAppIdError, NotFoundError } from '../lib/errors';
|
||||||
import logTypes = require('../lib/log-types');
|
import logTypes = require('../lib/log-types');
|
||||||
import { checkInt } from '../lib/validation';
|
import { checkInt } from '../lib/validation';
|
||||||
|
@ -1,331 +0,0 @@
|
|||||||
Promise = require 'bluebird'
|
|
||||||
_ = require 'lodash'
|
|
||||||
EventEmitter = require 'events'
|
|
||||||
JSONStream = require 'JSONStream'
|
|
||||||
fs = Promise.promisifyAll(require('fs'))
|
|
||||||
|
|
||||||
logTypes = require '../lib/log-types'
|
|
||||||
{ checkInt, isValidDeviceName } = require '../lib/validation'
|
|
||||||
constants = require '../lib/constants'
|
|
||||||
|
|
||||||
{ Service } = require './service'
|
|
||||||
|
|
||||||
{ NotFoundError } = require '../lib/errors'
|
|
||||||
|
|
||||||
module.exports = class ServiceManager extends EventEmitter
|
|
||||||
constructor: ({ @docker, @logger, @config }) ->
|
|
||||||
@containerHasDied = {}
|
|
||||||
@listening = false
|
|
||||||
# Volatile state of containers, indexed by containerId (or random strings if we don't have a containerId yet)
|
|
||||||
@volatileState = {}
|
|
||||||
|
|
||||||
killAllLegacy: =>
|
|
||||||
# Containers haven't been normalized (this is an updated supervisor)
|
|
||||||
# so we need to stop and remove them
|
|
||||||
@docker.getImage(constants.supervisorImage).inspect()
|
|
||||||
.then (supervisorImage) =>
|
|
||||||
Promise.map @docker.listContainers(all: true), (container) =>
|
|
||||||
if container.ImageID != supervisorImage.Id
|
|
||||||
@_killContainer(container.Id, { serviceName: 'legacy', image: container.ImageID }, { removeContainer: true })
|
|
||||||
|
|
||||||
reportChange: (containerId, status) ->
|
|
||||||
if status?
|
|
||||||
@volatileState[containerId] ?= {}
|
|
||||||
_.merge(@volatileState[containerId], status)
|
|
||||||
else if containerId? and @volatileState[containerId]?
|
|
||||||
delete @volatileState[containerId]
|
|
||||||
@emit('change')
|
|
||||||
|
|
||||||
reportNewStatus: (containerId, service, status) =>
|
|
||||||
@reportChange(containerId, _.merge({ status }, _.pick(service, [ 'imageId', 'appId', 'releaseId', 'commit' ])))
|
|
||||||
|
|
||||||
_killContainer: (containerId, service = {}, { removeContainer = true, wait = false }) =>
|
|
||||||
Promise.try =>
|
|
||||||
@logger.logSystemEvent(logTypes.stopService, { service })
|
|
||||||
if service.imageId?
|
|
||||||
@reportNewStatus(containerId, service, 'Stopping')
|
|
||||||
containerObj = @docker.getContainer(containerId)
|
|
||||||
killPromise = containerObj.stop().then ->
|
|
||||||
if removeContainer
|
|
||||||
containerObj.remove(v: true)
|
|
||||||
.catch (err) =>
|
|
||||||
# Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison
|
|
||||||
# reasons.
|
|
||||||
statusCode = checkInt(err.statusCode)
|
|
||||||
# 304 means the container was already stopped - so we can just remove it
|
|
||||||
if statusCode is 304
|
|
||||||
@logger.logSystemEvent(logTypes.stopServiceNoop, { service })
|
|
||||||
if removeContainer
|
|
||||||
return containerObj.remove(v: true)
|
|
||||||
return
|
|
||||||
# 404 means the container doesn't exist, precisely what we want! :D
|
|
||||||
if statusCode is 404
|
|
||||||
@logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service })
|
|
||||||
return
|
|
||||||
throw err
|
|
||||||
.tap =>
|
|
||||||
delete @containerHasDied[containerId]
|
|
||||||
@logger.logSystemEvent(logTypes.stopServiceSuccess, { service })
|
|
||||||
.catch (err) =>
|
|
||||||
@logger.logSystemEvent(logTypes.stopServiceError, { service, error: err })
|
|
||||||
.finally =>
|
|
||||||
if service.imageId?
|
|
||||||
@reportChange(containerId)
|
|
||||||
if wait
|
|
||||||
return killPromise
|
|
||||||
return null
|
|
||||||
|
|
||||||
kill: (service, { removeContainer = true, wait = false } = {}) =>
|
|
||||||
@_killContainer(service.containerId, service, { removeContainer, wait })
|
|
||||||
|
|
||||||
remove: (service) =>
|
|
||||||
@logger.logSystemEvent(logTypes.removeDeadService, { service })
|
|
||||||
@get(service)
|
|
||||||
.then (existingService) =>
|
|
||||||
@docker.getContainer(existingService.containerId).remove(v: true)
|
|
||||||
.catchReturn(NotFoundError, null)
|
|
||||||
.tapCatch (err) =>
|
|
||||||
@logger.logSystemEvent(logTypes.removeDeadServiceError, { service, error: err })
|
|
||||||
|
|
||||||
getAllByAppId: (appId) =>
|
|
||||||
@getAll("app-id=#{appId}")
|
|
||||||
|
|
||||||
stopAllByAppId: (appId) =>
|
|
||||||
Promise.map @getAllByAppId(appId), (service) =>
|
|
||||||
@kill(service, { removeContainer: false })
|
|
||||||
|
|
||||||
create: (service) =>
|
|
||||||
mockContainerId = @config.newUniqueKey()
|
|
||||||
@get(service)
|
|
||||||
.then (existingService) =>
|
|
||||||
return @docker.getContainer(existingService.containerId)
|
|
||||||
.catch NotFoundError, =>
|
|
||||||
|
|
||||||
@config.get('name')
|
|
||||||
.then (deviceName) =>
|
|
||||||
if !isValidDeviceName(deviceName)
|
|
||||||
throw new Error(
|
|
||||||
'The device name contains a newline, which is unsupported by balena. ' +
|
|
||||||
'Please fix the device name.'
|
|
||||||
)
|
|
||||||
|
|
||||||
conf = service.toDockerContainer({ deviceName })
|
|
||||||
nets = service.extraNetworksToJoin()
|
|
||||||
|
|
||||||
@logger.logSystemEvent(logTypes.installService, { service })
|
|
||||||
@reportNewStatus(mockContainerId, service, 'Installing')
|
|
||||||
|
|
||||||
@docker.createContainer(conf)
|
|
||||||
.tap (container) =>
|
|
||||||
service.containerId = container.id
|
|
||||||
|
|
||||||
Promise.all _.map nets, (endpointConfig, name) =>
|
|
||||||
@docker
|
|
||||||
.getNetwork(name)
|
|
||||||
.connect({ Container: container.id, EndpointConfig: endpointConfig })
|
|
||||||
.tap =>
|
|
||||||
@logger.logSystemEvent(logTypes.installServiceSuccess, { service })
|
|
||||||
.tapCatch (err) =>
|
|
||||||
@logger.logSystemEvent(logTypes.installServiceError, { service, error: err })
|
|
||||||
.finally =>
|
|
||||||
@reportChange(mockContainerId)
|
|
||||||
|
|
||||||
start: (service) =>
|
|
||||||
alreadyStarted = false
|
|
||||||
containerId = null
|
|
||||||
@create(service)
|
|
||||||
.tap (container) =>
|
|
||||||
containerId = container.id
|
|
||||||
@logger.logSystemEvent(logTypes.startService, { service })
|
|
||||||
@reportNewStatus(containerId, service, 'Starting')
|
|
||||||
container.start()
|
|
||||||
.catch (err) =>
|
|
||||||
statusCode = checkInt(err.statusCode)
|
|
||||||
# 304 means the container was already started, precisely what we want :)
|
|
||||||
if statusCode is 304
|
|
||||||
alreadyStarted = true
|
|
||||||
return
|
|
||||||
|
|
||||||
if statusCode is 500 and err.message?.trim?()?.match(/exec format error$/)
|
|
||||||
# Provide a friendlier error message for "exec format error"
|
|
||||||
@config.get('deviceType')
|
|
||||||
.then (deviceType) ->
|
|
||||||
throw new Error("Application architecture incompatible with #{deviceType}: exec format error")
|
|
||||||
else
|
|
||||||
# rethrow the same error
|
|
||||||
throw err
|
|
||||||
.tapCatch (err) =>
|
|
||||||
# If starting the container failed, we remove it so that it doesn't litter
|
|
||||||
container.remove(v: true)
|
|
||||||
.catchReturn()
|
|
||||||
.then =>
|
|
||||||
@logger.logSystemEvent(logTypes.startServiceError, { service, error: err })
|
|
||||||
.then =>
|
|
||||||
@logger.attach(@docker, container.id, service)
|
|
||||||
.tap =>
|
|
||||||
if alreadyStarted
|
|
||||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
|
||||||
else
|
|
||||||
@logger.logSystemEvent(logTypes.startServiceSuccess, { service })
|
|
||||||
.tap ->
|
|
||||||
service.running = true
|
|
||||||
.finally =>
|
|
||||||
@reportChange(containerId)
|
|
||||||
|
|
||||||
_listWithBothLabels: (labelList) =>
|
|
||||||
listWithPrefix = (prefix) =>
|
|
||||||
@docker.listContainers({ all: true, filters: label: _.map(labelList, (v) -> prefix + v) })
|
|
||||||
Promise.join(
|
|
||||||
listWithPrefix('io.resin.')
|
|
||||||
listWithPrefix('io.balena.')
|
|
||||||
(legacyContainers, currentContainers) ->
|
|
||||||
_.unionBy(legacyContainers, currentContainers, 'Id')
|
|
||||||
)
|
|
||||||
|
|
||||||
# Gets all existing containers that correspond to apps
|
|
||||||
getAll: (extraLabelFilters = []) =>
|
|
||||||
filterLabels = [ 'supervised' ].concat(extraLabelFilters)
|
|
||||||
@_listWithBothLabels(filterLabels)
|
|
||||||
.mapSeries (container) =>
|
|
||||||
@docker.getContainer(container.Id).inspect()
|
|
||||||
.then(Service.fromDockerContainer)
|
|
||||||
.then (service) =>
|
|
||||||
if @volatileState[service.containerId]?.status?
|
|
||||||
service.status = @volatileState[service.containerId].status
|
|
||||||
return service
|
|
||||||
.catchReturn(NotFoundError, null)
|
|
||||||
.filter(_.negate(_.isNil))
|
|
||||||
|
|
||||||
# Returns the first container matching a service definition
|
|
||||||
get: (service) =>
|
|
||||||
@getAll("service-id=#{service.serviceId}")
|
|
||||||
.filter((currentService) -> currentService.isEqualConfig(service))
|
|
||||||
.then (services) ->
|
|
||||||
if services.length == 0
|
|
||||||
e = new Error('Could not find a container matching this service definition')
|
|
||||||
e.statusCode = 404
|
|
||||||
throw e
|
|
||||||
return services[0]
|
|
||||||
|
|
||||||
getStatus: =>
|
|
||||||
@getAll()
|
|
||||||
.then (services) =>
|
|
||||||
status = _.clone(@volatileState)
|
|
||||||
for service in services
|
|
||||||
status[service.containerId] ?= _.pick(service, [
|
|
||||||
'appId',
|
|
||||||
'imageId',
|
|
||||||
'status',
|
|
||||||
'releaseId',
|
|
||||||
'commit',
|
|
||||||
'createdAt',
|
|
||||||
'serviceName',
|
|
||||||
])
|
|
||||||
return _.values(status)
|
|
||||||
|
|
||||||
getByDockerContainerId: (containerId) =>
|
|
||||||
@docker.getContainer(containerId).inspect()
|
|
||||||
.then (container) ->
|
|
||||||
if !(container.Config.Labels['io.balena.supervised']? or container.Config.Labels['io.resin.supervised']?)
|
|
||||||
return null
|
|
||||||
return Service.fromDockerContainer(container)
|
|
||||||
|
|
||||||
waitToKill: (service, timeout) ->
|
|
||||||
pollInterval = 100
|
|
||||||
timeout = checkInt(timeout, positive: true) ? 60000
|
|
||||||
deadline = Date.now() + timeout
|
|
||||||
|
|
||||||
handoverCompletePaths = service.handoverCompleteFullPathsOnHost()
|
|
||||||
|
|
||||||
wait = ->
|
|
||||||
Promise.any _.map handoverCompletePaths, (file) ->
|
|
||||||
fs.statAsync(file)
|
|
||||||
.then ->
|
|
||||||
fs.unlinkAsync(file).catch(_.noop)
|
|
||||||
.catch ->
|
|
||||||
if Date.now() < deadline
|
|
||||||
Promise.delay(pollInterval).then(wait)
|
|
||||||
else
|
|
||||||
console.log('Handover timeout has passed, assuming handover was completed')
|
|
||||||
|
|
||||||
console.log('Waiting for handover to be completed')
|
|
||||||
wait()
|
|
||||||
.then ->
|
|
||||||
console.log('Handover complete')
|
|
||||||
|
|
||||||
prepareForHandover: (service) =>
|
|
||||||
@get(service)
|
|
||||||
.then (svc) =>
|
|
||||||
container = @docker.getContainer(svc.containerId)
|
|
||||||
container.update(RestartPolicy: {})
|
|
||||||
.then ->
|
|
||||||
container.rename(name: "old_#{service.serviceName}_#{service.imageId}_#{service.releaseId}")
|
|
||||||
|
|
||||||
updateMetadata: (service, { imageId, releaseId }) =>
|
|
||||||
@get(service)
|
|
||||||
.then (svc) =>
|
|
||||||
@docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{imageId}_#{releaseId}")
|
|
||||||
.then =>
|
|
||||||
@reportChange()
|
|
||||||
|
|
||||||
handover: (currentService, targetService) =>
|
|
||||||
# We set the running container to not restart so that in case of a poweroff
|
|
||||||
# it doesn't come back after boot.
|
|
||||||
@prepareForHandover(currentService)
|
|
||||||
.then =>
|
|
||||||
@start(targetService)
|
|
||||||
.then =>
|
|
||||||
@waitToKill(currentService, targetService.config.labels['io.balena.update.handover-timeout'])
|
|
||||||
.then =>
|
|
||||||
@kill(currentService)
|
|
||||||
|
|
||||||
listenToEvents: =>
|
|
||||||
if @listening
|
|
||||||
return
|
|
||||||
@listening = true
|
|
||||||
@docker.getEvents(filters: type: [ 'container' ])
|
|
||||||
.then (stream) =>
|
|
||||||
stream.on 'error', (err) ->
|
|
||||||
console.error('Error on docker events stream:', err)
|
|
||||||
parser = JSONStream.parse()
|
|
||||||
parser.on 'data', (data) =>
|
|
||||||
if data?.status in ['die', 'start']
|
|
||||||
@getByDockerContainerId(data.id)
|
|
||||||
.catchReturn(NotFoundError, null)
|
|
||||||
.then (service) =>
|
|
||||||
if service?
|
|
||||||
if data.status == 'die'
|
|
||||||
@emit('change')
|
|
||||||
@logger.logSystemEvent(logTypes.serviceExit, { service })
|
|
||||||
@containerHasDied[data.id] = true
|
|
||||||
else if data.status == 'start' and @containerHasDied[data.id]
|
|
||||||
@emit('change')
|
|
||||||
delete @containerHasDied[data.id]
|
|
||||||
@logger.logSystemEvent(logTypes.serviceRestart, { service })
|
|
||||||
@logger.attach(@docker, data.id, service)
|
|
||||||
.catch (err) ->
|
|
||||||
console.error('Error on docker event:', err, err.stack)
|
|
||||||
return
|
|
||||||
new Promise (resolve, reject) ->
|
|
||||||
parser
|
|
||||||
.on 'error', (err) ->
|
|
||||||
console.error('Error on docker events stream', err)
|
|
||||||
reject(err)
|
|
||||||
.on 'end', ->
|
|
||||||
resolve()
|
|
||||||
stream.pipe(parser)
|
|
||||||
.catch (err) ->
|
|
||||||
console.error('Error listening to events:', err, err.stack)
|
|
||||||
.finally =>
|
|
||||||
@listening = false
|
|
||||||
setTimeout =>
|
|
||||||
@listenToEvents()
|
|
||||||
, 1000
|
|
||||||
return
|
|
||||||
|
|
||||||
attachToRunning: =>
|
|
||||||
@getAll()
|
|
||||||
.map (service) =>
|
|
||||||
if service.status == 'Running'
|
|
||||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
|
||||||
@logger.attach(@docker, service.containerId, service)
|
|
12
src/compose/service-manager.d.ts
vendored
12
src/compose/service-manager.d.ts
vendored
@ -1,12 +0,0 @@
|
|||||||
import * as Bluebird from 'bluebird';
|
|
||||||
import { EventEmitter } from 'events';
|
|
||||||
|
|
||||||
import { Service } from '../compose/service';
|
|
||||||
|
|
||||||
// FIXME: Unfinished definition for this class...
|
|
||||||
declare class ServiceManager extends EventEmitter {
|
|
||||||
public getStatus(): Service[];
|
|
||||||
public getAll(): Bluebird<Service[]>;
|
|
||||||
}
|
|
||||||
|
|
||||||
export = ServiceManager;
|
|
660
src/compose/service-manager.ts
Normal file
660
src/compose/service-manager.ts
Normal file
@ -0,0 +1,660 @@
|
|||||||
|
import * as Bluebird from 'bluebird';
|
||||||
|
import * as Dockerode from 'dockerode';
|
||||||
|
import { EventEmitter } from 'events';
|
||||||
|
import * as JSONStream from 'JSONStream';
|
||||||
|
import * as _ from 'lodash';
|
||||||
|
import { fs } from 'mz';
|
||||||
|
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||||
|
|
||||||
|
import Config from '../config';
|
||||||
|
import Docker from '../lib/docker-utils';
|
||||||
|
import Logger from '../logger';
|
||||||
|
|
||||||
|
import { PermissiveNumber } from '../config/types';
|
||||||
|
import constants = require('../lib/constants');
|
||||||
|
import {
|
||||||
|
InternalInconsistencyError,
|
||||||
|
NotFoundError,
|
||||||
|
StatusCodeError,
|
||||||
|
} from '../lib/errors';
|
||||||
|
import * as LogTypes from '../lib/log-types';
|
||||||
|
import { checkInt, isValidDeviceName } from '../lib/validation';
|
||||||
|
import { Service } from './service';
|
||||||
|
|
||||||
|
interface ServiceConstructOpts {
|
||||||
|
docker: Docker;
|
||||||
|
logger: Logger;
|
||||||
|
config: Config;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ServiceManagerEvents {
|
||||||
|
change: void;
|
||||||
|
}
|
||||||
|
type ServiceManagerEventEmitter = StrictEventEmitter<
|
||||||
|
EventEmitter,
|
||||||
|
ServiceManagerEvents
|
||||||
|
>;
|
||||||
|
|
||||||
|
interface KillOpts {
|
||||||
|
removeContainer?: boolean;
|
||||||
|
wait?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ServiceManager extends (EventEmitter as {
|
||||||
|
new (): ServiceManagerEventEmitter;
|
||||||
|
}) {
|
||||||
|
private docker: Docker;
|
||||||
|
private logger: Logger;
|
||||||
|
private config: Config;
|
||||||
|
|
||||||
|
// Whether a container has died, indexed by ID
|
||||||
|
private containerHasDied: Dictionary<boolean> = {};
|
||||||
|
private listening = false;
|
||||||
|
// Volatile state of containers, indexed by containerId (or random strings if
|
||||||
|
// we don't yet have an id)
|
||||||
|
private volatileState: Dictionary<Partial<Service>> = {};
|
||||||
|
|
||||||
|
public constructor(opts: ServiceConstructOpts) {
|
||||||
|
super();
|
||||||
|
this.docker = opts.docker;
|
||||||
|
this.logger = opts.logger;
|
||||||
|
this.config = opts.config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getAll(
|
||||||
|
extraLabelFilters: string | string[] = [],
|
||||||
|
): Promise<Service[]> {
|
||||||
|
const filterLabels = ['supervised'].concat(extraLabelFilters);
|
||||||
|
const containers = await this.listWithBothLabels(filterLabels);
|
||||||
|
|
||||||
|
const services = await Bluebird.map(containers, async container => {
|
||||||
|
try {
|
||||||
|
const serviceInspect = await this.docker
|
||||||
|
.getContainer(container.Id)
|
||||||
|
.inspect();
|
||||||
|
const service = Service.fromDockerContainer(serviceInspect);
|
||||||
|
// We know that the containerId is set below, because `fromDockerContainer`
|
||||||
|
// always sets it
|
||||||
|
const vState = this.volatileState[service.containerId!];
|
||||||
|
if (vState != null && vState.status != null) {
|
||||||
|
service.status = vState.status;
|
||||||
|
}
|
||||||
|
return service;
|
||||||
|
} catch (e) {
|
||||||
|
if (NotFoundError(e)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return services.filter(s => s != null) as Service[];
|
||||||
|
}
|
||||||
|
|
||||||
|
public async get(service: Service) {
|
||||||
|
const services = (await this.getAll(
|
||||||
|
`service-id=${service.serviceId}`,
|
||||||
|
)).filter(currentService => currentService.isEqualConfig(service));
|
||||||
|
|
||||||
|
if (services.length === 0) {
|
||||||
|
const e: StatusCodeError = new Error(
|
||||||
|
'Could not find a container matching this service definition',
|
||||||
|
);
|
||||||
|
e.statusCode = 404;
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return services[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getStatus() {
|
||||||
|
const services = await this.getAll();
|
||||||
|
const status = _.clone(this.volatileState);
|
||||||
|
|
||||||
|
for (const service of services) {
|
||||||
|
if (service.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`containerId not defined in ServiceManager.getStatus: ${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (status[service.containerId] == null) {
|
||||||
|
status[service.containerId] = _.pick(service, [
|
||||||
|
'appId',
|
||||||
|
'imageId',
|
||||||
|
'status',
|
||||||
|
'releaseId',
|
||||||
|
'commit',
|
||||||
|
'createdAt',
|
||||||
|
'serviceName',
|
||||||
|
]) as Partial<Service>;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return _.values(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getByDockerContainerId(
|
||||||
|
containerId: string,
|
||||||
|
): Promise<Service | null> {
|
||||||
|
const container = await this.docker.getContainer(containerId).inspect();
|
||||||
|
if (
|
||||||
|
container.Config.Labels['io.balena.supervised'] == null &&
|
||||||
|
container.Config.Labels['io.resin.supervised'] == null
|
||||||
|
) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return Service.fromDockerContainer(container);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async updateMetadata(
|
||||||
|
service: Service,
|
||||||
|
metadata: { imageId: number; releaseId: number },
|
||||||
|
) {
|
||||||
|
const svc = await this.get(service);
|
||||||
|
if (svc.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`No containerId provided for service ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.docker.getContainer(svc.containerId).rename({
|
||||||
|
name: `${service.serviceName}_${metadata.imageId}_${metadata.releaseId}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async handover(current: Service, target: Service) {
|
||||||
|
// We set the running container to not restart so that in case of a poweroff
|
||||||
|
// it doesn't come back after boot.
|
||||||
|
await this.prepareForHandover(current);
|
||||||
|
await this.start(target);
|
||||||
|
await this.waitToKill(
|
||||||
|
current,
|
||||||
|
target.config.labels['io.balena.update.handover-timeout'],
|
||||||
|
);
|
||||||
|
await this.kill(current);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async killAllLegacy(): Promise<void> {
|
||||||
|
// Containers haven't been normalized (this is an updated supervisor)
|
||||||
|
// so we need to stop and remove them
|
||||||
|
const supervisorImageId = (await this.docker
|
||||||
|
.getImage(constants.supervisorImage)
|
||||||
|
.inspect()).Id;
|
||||||
|
|
||||||
|
for (const container of await this.docker.listContainers({ all: true })) {
|
||||||
|
if (container.ImageID !== supervisorImageId) {
|
||||||
|
await this.killContainer(container.Id, {
|
||||||
|
serviceName: 'legacy',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public kill(service: Service, opts: KillOpts = {}) {
|
||||||
|
if (service.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`Attempt to kill container without containerId! Service :${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return this.killContainer(service.containerId, service, opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async remove(service: Service) {
|
||||||
|
this.logger.logSystemEvent(LogTypes.removeDeadService, { service });
|
||||||
|
const existingService = await this.get(service);
|
||||||
|
|
||||||
|
if (existingService.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`No containerId provided for service ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.docker
|
||||||
|
.getContainer(existingService.containerId)
|
||||||
|
.remove({ v: true });
|
||||||
|
} catch (e) {
|
||||||
|
if (!NotFoundError(e)) {
|
||||||
|
this.logger.logSystemEvent(LogTypes.removeDeadServiceError, {
|
||||||
|
service,
|
||||||
|
error: e,
|
||||||
|
});
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async create(service: Service) {
|
||||||
|
const mockContainerId = this.config.newUniqueKey();
|
||||||
|
try {
|
||||||
|
const existing = await this.get(service);
|
||||||
|
if (existing.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`No containerId provided for service ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return this.docker.getContainer(existing.containerId);
|
||||||
|
} catch (e) {
|
||||||
|
if (!NotFoundError(e)) {
|
||||||
|
this.logger.logSystemEvent(LogTypes.installServiceError, {
|
||||||
|
service,
|
||||||
|
error: e,
|
||||||
|
});
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceName = await this.config.get('name');
|
||||||
|
if (!isValidDeviceName(deviceName)) {
|
||||||
|
throw new Error(
|
||||||
|
'The device name contains a newline, which is unsupported by balena. ' +
|
||||||
|
'Please fix the device name',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const conf = service.toDockerContainer({ deviceName });
|
||||||
|
const nets = service.extraNetworksToJoin();
|
||||||
|
|
||||||
|
this.logger.logSystemEvent(LogTypes.installService, { service });
|
||||||
|
this.reportNewStatus(mockContainerId, service, 'Installing');
|
||||||
|
|
||||||
|
const container = await this.docker.createContainer(conf);
|
||||||
|
service.containerId = container.id;
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
_.map(nets, (endpointConfig, name) =>
|
||||||
|
this.docker.getNetwork(name).connect({
|
||||||
|
Container: container.id,
|
||||||
|
EndpointConfig: endpointConfig,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.logSystemEvent(LogTypes.installServiceSuccess, { service });
|
||||||
|
return container;
|
||||||
|
} finally {
|
||||||
|
this.reportChange(mockContainerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async start(service: Service) {
|
||||||
|
let alreadyStarted = false;
|
||||||
|
let containerId: string | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const container = await this.create(service);
|
||||||
|
containerId = container.id;
|
||||||
|
this.logger.logSystemEvent(LogTypes.startService, { service });
|
||||||
|
|
||||||
|
this.reportNewStatus(containerId, service, 'Starting');
|
||||||
|
|
||||||
|
let remove = false;
|
||||||
|
let err: Error | undefined;
|
||||||
|
try {
|
||||||
|
await container.start();
|
||||||
|
} catch (e) {
|
||||||
|
// Get the statusCode from the original cause and make sure it's
|
||||||
|
// definitely an int for comparison reasons
|
||||||
|
const maybeStatusCode = PermissiveNumber.decode(e.statusCode);
|
||||||
|
if (maybeStatusCode.isLeft()) {
|
||||||
|
remove = true;
|
||||||
|
err = new Error(
|
||||||
|
`Could not parse status code from docker error: ${e}`,
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const statusCode = maybeStatusCode.value;
|
||||||
|
const message = e.message;
|
||||||
|
|
||||||
|
// 304 means the container was already started, precisely what we want
|
||||||
|
if (statusCode === 304) {
|
||||||
|
alreadyStarted = true;
|
||||||
|
} else if (
|
||||||
|
statusCode === 500 &&
|
||||||
|
_.isString(message) &&
|
||||||
|
message.trim().match(/exec format error$/)
|
||||||
|
) {
|
||||||
|
// Provide a friendlier error message for "exec format error"
|
||||||
|
const deviceType = await this.config.get('deviceType');
|
||||||
|
err = new Error(
|
||||||
|
`Application architecture incompatible with ${deviceType}: exec format error`,
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
} else {
|
||||||
|
// rethrow the same error
|
||||||
|
err = e;
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (remove) {
|
||||||
|
// If starting the container fialed, we remove it so that it doesn't litter
|
||||||
|
await container.remove({ v: true }).catch(_.noop);
|
||||||
|
this.logger.logSystemEvent(LogTypes.startServiceError, {
|
||||||
|
service,
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const serviceId = service.serviceId;
|
||||||
|
const imageId = service.imageId;
|
||||||
|
if (serviceId == null || imageId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`serviceId and imageId not defined for service: ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.start`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.attach(this.docker, container.id, { serviceId, imageId });
|
||||||
|
|
||||||
|
if (alreadyStarted) {
|
||||||
|
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||||
|
} else {
|
||||||
|
this.logger.logSystemEvent(LogTypes.startServiceSuccess, { service });
|
||||||
|
}
|
||||||
|
|
||||||
|
service.config.running = true;
|
||||||
|
return container;
|
||||||
|
} finally {
|
||||||
|
if (containerId != null) {
|
||||||
|
this.reportChange(containerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public listenToEvents() {
|
||||||
|
if (this.listening) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.listening = true;
|
||||||
|
|
||||||
|
const listen = async () => {
|
||||||
|
const stream = await this.docker.getEvents({
|
||||||
|
filters: { type: ['container'] },
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.on('error', e => {
|
||||||
|
console.error(`Error on docker events stream:`, e);
|
||||||
|
});
|
||||||
|
const parser = JSONStream.parse();
|
||||||
|
parser.on('data', async (data: { status: string; id: string }) => {
|
||||||
|
if (data != null) {
|
||||||
|
const status = data.status;
|
||||||
|
if (status === 'die' || status === 'start') {
|
||||||
|
try {
|
||||||
|
let service: Service | null = null;
|
||||||
|
try {
|
||||||
|
service = await this.getByDockerContainerId(data.id);
|
||||||
|
} catch (e) {
|
||||||
|
if (!NotFoundError(e)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (service != null) {
|
||||||
|
this.emit('change');
|
||||||
|
if (status === 'die') {
|
||||||
|
this.logger.logSystemEvent(LogTypes.serviceExit, { service });
|
||||||
|
this.containerHasDied[data.id] = true;
|
||||||
|
} else if (
|
||||||
|
status === 'start' &&
|
||||||
|
this.containerHasDied[data.id]
|
||||||
|
) {
|
||||||
|
delete this.containerHasDied[data.id];
|
||||||
|
this.logger.logSystemEvent(LogTypes.serviceRestart, {
|
||||||
|
service,
|
||||||
|
});
|
||||||
|
|
||||||
|
const serviceId = service.serviceId;
|
||||||
|
const imageId = service.imageId;
|
||||||
|
if (serviceId == null || imageId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`serviceId and imageId not defined for service: ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.listenToEvents`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.logger.attach(this.docker, data.id, {
|
||||||
|
serviceId,
|
||||||
|
imageId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error('Error on docker event:', e, e.stack);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
parser
|
||||||
|
.on('error', (e: Error) => {
|
||||||
|
console.error('Error on docker events stream:', e);
|
||||||
|
reject(e);
|
||||||
|
})
|
||||||
|
.on('end', resolve);
|
||||||
|
stream.pipe(parser);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
Bluebird.resolve(listen())
|
||||||
|
.catch(e => {
|
||||||
|
console.error('Error listening to events:', e, e.stack);
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
this.listening = false;
|
||||||
|
setTimeout(() => this.listenToEvents(), 1000);
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async attachToRunning() {
|
||||||
|
const services = await this.getAll();
|
||||||
|
for (const service of services) {
|
||||||
|
if (service.status === 'Running') {
|
||||||
|
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||||
|
|
||||||
|
const serviceId = service.serviceId;
|
||||||
|
const imageId = service.imageId;
|
||||||
|
if (serviceId == null || imageId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`serviceId and imageId not defined for service: ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.start`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (service.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`containerId not defined for service: ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.attachToRunning`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.logger.attach(this.docker, service.containerId, {
|
||||||
|
serviceId,
|
||||||
|
imageId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private reportChange(containerId?: string, status?: Partial<Service>) {
|
||||||
|
if (containerId != null) {
|
||||||
|
if (status != null) {
|
||||||
|
this.volatileState[containerId] = {};
|
||||||
|
_.merge(this.volatileState[containerId], status);
|
||||||
|
} else if (this.volatileState[containerId] != null) {
|
||||||
|
delete this.volatileState[containerId];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.emit('change');
|
||||||
|
}
|
||||||
|
|
||||||
|
private reportNewStatus(
|
||||||
|
containerId: string,
|
||||||
|
service: Partial<Service>,
|
||||||
|
status: string,
|
||||||
|
) {
|
||||||
|
this.reportChange(
|
||||||
|
containerId,
|
||||||
|
_.merge(
|
||||||
|
{ status },
|
||||||
|
_.pick(service, ['imageId', 'appId', 'releaseId', 'commit']),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private killContainer(
|
||||||
|
containerId: string,
|
||||||
|
service: Partial<Service> = {},
|
||||||
|
{ removeContainer = true, wait = false }: KillOpts = {},
|
||||||
|
): Bluebird<void> {
|
||||||
|
// To maintain compatibility of the `wait` flag, this function is not
|
||||||
|
// async, but it feels like whether or not the promise should be waited on
|
||||||
|
// should performed by the caller
|
||||||
|
// TODO: Remove the need for the wait flag
|
||||||
|
|
||||||
|
return Bluebird.try(() => {
|
||||||
|
this.logger.logSystemEvent(LogTypes.stopService, { service });
|
||||||
|
if (service.imageId != null) {
|
||||||
|
this.reportNewStatus(containerId, service, 'Stopping');
|
||||||
|
}
|
||||||
|
|
||||||
|
const containerObj = this.docker.getContainer(containerId);
|
||||||
|
const killPromise = Bluebird.resolve(containerObj.stop())
|
||||||
|
.then(() => {
|
||||||
|
if (removeContainer) {
|
||||||
|
return containerObj.remove({ v: true });
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch(e => {
|
||||||
|
// Get the statusCode from the original cause and make sure it's
|
||||||
|
// definitely an int for comparison reasons
|
||||||
|
const maybeStatusCode = PermissiveNumber.decode(e.statusCode);
|
||||||
|
if (maybeStatusCode.isLeft()) {
|
||||||
|
throw new Error(
|
||||||
|
`Could not parse status code from docker error: ${e}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const statusCode = maybeStatusCode.value;
|
||||||
|
|
||||||
|
// 304 means the container was already stopped, so we can just remove it
|
||||||
|
if (statusCode === 304) {
|
||||||
|
this.logger.logSystemEvent(LogTypes.stopServiceNoop, { service });
|
||||||
|
// Why do we attempt to remove the container again?
|
||||||
|
if (removeContainer) {
|
||||||
|
return containerObj.remove({ v: true });
|
||||||
|
}
|
||||||
|
} else if (statusCode === 404) {
|
||||||
|
// 404 means the container doesn't exist, precisely what we want!
|
||||||
|
this.logger.logSystemEvent(LogTypes.stopRemoveServiceNoop, {
|
||||||
|
service,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.tap(() => {
|
||||||
|
delete this.containerHasDied[containerId];
|
||||||
|
this.logger.logSystemEvent(LogTypes.stopServiceSuccess, { service });
|
||||||
|
})
|
||||||
|
.catch(e => {
|
||||||
|
this.logger.logSystemEvent(LogTypes.stopServiceError, {
|
||||||
|
service,
|
||||||
|
error: e,
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
if (service.imageId != null) {
|
||||||
|
this.reportChange(containerId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (wait) {
|
||||||
|
return killPromise;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async listWithBothLabels(
|
||||||
|
labelList: string[],
|
||||||
|
): Promise<Dockerode.ContainerInfo[]> {
|
||||||
|
const listWithPrefix = (prefix: string) =>
|
||||||
|
this.docker.listContainers({
|
||||||
|
all: true,
|
||||||
|
filters: {
|
||||||
|
label: _.map(labelList, v => `${prefix}${v}`),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const [legacy, current] = await Promise.all([
|
||||||
|
listWithPrefix('io.resin.'),
|
||||||
|
listWithPrefix('io.balena.'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
return _.unionBy(legacy, current, 'Id');
|
||||||
|
}
|
||||||
|
|
||||||
|
private async prepareForHandover(service: Service) {
|
||||||
|
const svc = await this.get(service);
|
||||||
|
if (svc.containerId == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
`No containerId provided for service ${
|
||||||
|
service.serviceName
|
||||||
|
} in ServiceManager.prepareForHandover. Service: ${service}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const container = this.docker.getContainer(svc.containerId);
|
||||||
|
await container.update({ RestartPolicy: {} });
|
||||||
|
return await container.rename({
|
||||||
|
name: `old_${service.serviceName}_${service.imageId}_${service.imageId}_${
|
||||||
|
service.releaseId
|
||||||
|
}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private waitToKill(service: Service, timeout: number | string) {
|
||||||
|
const pollInterval = 100;
|
||||||
|
timeout = checkInt(timeout, { positive: true }) || 60000;
|
||||||
|
const deadline = Date.now() + timeout;
|
||||||
|
|
||||||
|
const handoverCompletePaths = service.handoverCompleteFullPathsOnHost();
|
||||||
|
|
||||||
|
const wait = (): Bluebird<void> =>
|
||||||
|
Bluebird.any(
|
||||||
|
handoverCompletePaths.map(file =>
|
||||||
|
fs.stat(file).then(() => fs.unlink(file).catch(_.noop)),
|
||||||
|
),
|
||||||
|
).catch(async () => {
|
||||||
|
if (Date.now() < deadline) {
|
||||||
|
await Bluebird.delay(pollInterval);
|
||||||
|
return wait();
|
||||||
|
} else {
|
||||||
|
console.log(
|
||||||
|
`Handover timeout has passed, assuming handover was completed for service ${
|
||||||
|
service.serviceName
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`Waiting for handover to be completed for service: ${
|
||||||
|
service.serviceName
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return wait().then(() => {
|
||||||
|
console.log(`Handover complete for service ${service.serviceName}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -17,9 +17,9 @@ import {
|
|||||||
} from './types/service';
|
} from './types/service';
|
||||||
import * as ComposeUtils from './utils';
|
import * as ComposeUtils from './utils';
|
||||||
|
|
||||||
|
import * as constants from '../lib/constants';
|
||||||
import * as updateLock from '../lib/update-lock';
|
import * as updateLock from '../lib/update-lock';
|
||||||
import { sanitiseComposeConfig } from './sanitise';
|
import { sanitiseComposeConfig } from './sanitise';
|
||||||
import * as constants from '../lib/constants';
|
|
||||||
|
|
||||||
export class Service {
|
export class Service {
|
||||||
public appId: number | null;
|
public appId: number | null;
|
||||||
|
@ -2,7 +2,7 @@ import * as Dockerode from 'dockerode';
|
|||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
import * as path from 'path';
|
import * as path from 'path';
|
||||||
|
|
||||||
import Docker = require('../lib/docker-utils');
|
import Docker from '../lib/docker-utils';
|
||||||
import Logger from '../logger';
|
import Logger from '../logger';
|
||||||
|
|
||||||
import constants = require('../lib/constants');
|
import constants = require('../lib/constants');
|
||||||
@ -205,7 +205,8 @@ export class Volumes {
|
|||||||
private async listWithBothLabels(): Promise<Dockerode.VolumeInspectInfo[]> {
|
private async listWithBothLabels(): Promise<Dockerode.VolumeInspectInfo[]> {
|
||||||
// We have to cast the listVolumes call from any[] to any below, until the
|
// We have to cast the listVolumes call from any[] to any below, until the
|
||||||
// relevant PR: https://github.com/DefinitelyTyped/DefinitelyTyped/pull/32383
|
// relevant PR: https://github.com/DefinitelyTyped/DefinitelyTyped/pull/32383
|
||||||
// is merged and released
|
// is merged and released - we can also replace Dockerode here with the Docker
|
||||||
|
// DockerUtils class imported above
|
||||||
const [legacyResponse, currentResponse]: [
|
const [legacyResponse, currentResponse]: [
|
||||||
Dockerode.VolumeInfoList,
|
Dockerode.VolumeInfoList,
|
||||||
Dockerode.VolumeInfoList
|
Dockerode.VolumeInfoList
|
||||||
|
@ -1,168 +0,0 @@
|
|||||||
constants = require './constants'
|
|
||||||
DockerToolbelt = require 'docker-toolbelt'
|
|
||||||
{ DockerProgress } = require 'docker-progress'
|
|
||||||
Promise = require 'bluebird'
|
|
||||||
dockerDelta = require 'docker-delta'
|
|
||||||
_ = require 'lodash'
|
|
||||||
{ request, resumable } = require './request'
|
|
||||||
{ envArrayToObject } = require './conversions'
|
|
||||||
{ DeltaStillProcessingError, InvalidNetGatewayError } = require './errors'
|
|
||||||
{ checkInt } = require './validation'
|
|
||||||
|
|
||||||
applyRsyncDelta = (imgSrc, deltaUrl, applyTimeout, opts, onProgress, log) ->
|
|
||||||
log('Applying rsync delta...')
|
|
||||||
new Promise (resolve, reject) ->
|
|
||||||
req = resumable(Object.assign({ url: deltaUrl }, opts))
|
|
||||||
.on('progress', onProgress)
|
|
||||||
.on('retry', onProgress)
|
|
||||||
.on('error', reject)
|
|
||||||
.on 'response', (res) ->
|
|
||||||
if res.statusCode isnt 200
|
|
||||||
reject(new Error("Got #{res.statusCode} when requesting delta from storage."))
|
|
||||||
else if parseInt(res.headers['content-length']) is 0
|
|
||||||
reject(new Error('Invalid delta URL.'))
|
|
||||||
else
|
|
||||||
deltaStream = dockerDelta.applyDelta(imgSrc, { log, timeout: applyTimeout })
|
|
||||||
res.pipe(deltaStream)
|
|
||||||
.on('id', (id) -> resolve('sha256:' + id))
|
|
||||||
.on 'error', (err) ->
|
|
||||||
log("Delta stream emitted error: #{err}")
|
|
||||||
req.abort(err)
|
|
||||||
reject(err)
|
|
||||||
|
|
||||||
applyBalenaDelta = (docker, deltaImg, token, onProgress, log) ->
|
|
||||||
log('Applying balena delta...')
|
|
||||||
if token?
|
|
||||||
log('Using registry auth token')
|
|
||||||
auth = { authconfig: registrytoken: token }
|
|
||||||
docker.dockerProgress.pull(deltaImg, onProgress, auth)
|
|
||||||
.then ->
|
|
||||||
docker.getImage(deltaImg).inspect().get('Id')
|
|
||||||
|
|
||||||
module.exports = class DockerUtils extends DockerToolbelt
|
|
||||||
constructor: (opts) ->
|
|
||||||
super(opts)
|
|
||||||
@dockerProgress = new DockerProgress(dockerToolbelt: this)
|
|
||||||
@supervisorTagPromise = @normaliseImageName(constants.supervisorImage)
|
|
||||||
|
|
||||||
getRepoAndTag: (image) =>
|
|
||||||
@getRegistryAndName(image)
|
|
||||||
.then ({ registry, imageName, tagName }) ->
|
|
||||||
if registry?
|
|
||||||
registry = registry.toString().replace(':443', '')
|
|
||||||
repoName = "#{registry}/#{imageName}"
|
|
||||||
else
|
|
||||||
repoName = imageName
|
|
||||||
return { repo: repoName, tag: tagName }
|
|
||||||
|
|
||||||
fetchDeltaWithProgress: (imgDest, fullDeltaOpts, onProgress) =>
|
|
||||||
{
|
|
||||||
deltaRequestTimeout, deltaApplyTimeout, deltaRetryCount, deltaRetryInterval,
|
|
||||||
uuid, currentApiKey, deltaEndpoint, apiEndpoint,
|
|
||||||
deltaSource, deltaSourceId, deltaVersion
|
|
||||||
} = fullDeltaOpts
|
|
||||||
retryCount = checkInt(deltaRetryCount)
|
|
||||||
retryInterval = checkInt(deltaRetryInterval)
|
|
||||||
requestTimeout = checkInt(deltaRequestTimeout)
|
|
||||||
applyTimeout = checkInt(deltaApplyTimeout)
|
|
||||||
version = checkInt(deltaVersion)
|
|
||||||
deltaSourceId ?= deltaSource
|
|
||||||
|
|
||||||
log = (str) ->
|
|
||||||
console.log("delta(#{deltaSource}): #{str}")
|
|
||||||
|
|
||||||
if not (version in [ 2, 3 ])
|
|
||||||
log("Unsupported delta version: #{version}. Falling back to regular pull")
|
|
||||||
return @fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress)
|
|
||||||
|
|
||||||
# Since the supervisor never calls this function without a source anymore,
|
|
||||||
# this should never happen, but we handle it anyways.
|
|
||||||
if !deltaSource?
|
|
||||||
log('Falling back to regular pull due to lack of a delta source')
|
|
||||||
return @fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress)
|
|
||||||
|
|
||||||
docker = this
|
|
||||||
|
|
||||||
log("Starting delta to #{imgDest}")
|
|
||||||
Promise.join @getRegistryAndName(imgDest), @getRegistryAndName(deltaSource), (dstInfo, srcInfo) ->
|
|
||||||
tokenEndpoint = "#{apiEndpoint}/auth/v1/token"
|
|
||||||
opts =
|
|
||||||
auth:
|
|
||||||
user: 'd_' + uuid
|
|
||||||
pass: currentApiKey
|
|
||||||
sendImmediately: true
|
|
||||||
json: true
|
|
||||||
timeout: requestTimeout
|
|
||||||
url = "#{tokenEndpoint}?service=#{dstInfo.registry}&scope=repository:#{dstInfo.imageName}:pull&scope=repository:#{srcInfo.imageName}:pull"
|
|
||||||
request.getAsync(url, opts)
|
|
||||||
.get(1)
|
|
||||||
.then (responseBody) ->
|
|
||||||
token = responseBody?.token
|
|
||||||
opts =
|
|
||||||
followRedirect: false
|
|
||||||
timeout: requestTimeout
|
|
||||||
|
|
||||||
if token?
|
|
||||||
opts.auth =
|
|
||||||
bearer: token
|
|
||||||
sendImmediately: true
|
|
||||||
request.getAsync("#{deltaEndpoint}/api/v#{version}/delta?src=#{deltaSource}&dest=#{imgDest}", opts)
|
|
||||||
.spread (res, data) ->
|
|
||||||
if res.statusCode in [ 502, 504 ]
|
|
||||||
throw new DeltaStillProcessingError()
|
|
||||||
switch version
|
|
||||||
when 2
|
|
||||||
if not (300 <= res.statusCode < 400 and res.headers['location']?)
|
|
||||||
throw new Error("Got #{res.statusCode} when requesting image from delta server.")
|
|
||||||
deltaUrl = res.headers['location']
|
|
||||||
if !deltaSource?
|
|
||||||
deltaSrc = null
|
|
||||||
else
|
|
||||||
deltaSrc = deltaSourceId
|
|
||||||
resumeOpts = { timeout: requestTimeout, maxRetries: retryCount, retryInterval }
|
|
||||||
applyRsyncDelta(deltaSrc, deltaUrl, applyTimeout, resumeOpts, onProgress, log)
|
|
||||||
when 3
|
|
||||||
if res.statusCode isnt 200
|
|
||||||
throw new Error("Got #{res.statusCode} when requesting image from delta server.")
|
|
||||||
name = JSON.parse(data).name
|
|
||||||
applyBalenaDelta(docker, name, token, onProgress, log)
|
|
||||||
else
|
|
||||||
# we guard against arbitrary versions above, so this can't really happen
|
|
||||||
throw new Error("Unsupported delta version: #{version}")
|
|
||||||
.catch dockerDelta.OutOfSyncError, (err) =>
|
|
||||||
log('Falling back to regular pull')
|
|
||||||
@fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress)
|
|
||||||
.tap ->
|
|
||||||
log('Delta applied successfully')
|
|
||||||
.tapCatch (err) ->
|
|
||||||
log("Delta failed with: #{err}")
|
|
||||||
|
|
||||||
|
|
||||||
fetchImageWithProgress: (image, { uuid, currentApiKey }, onProgress) =>
|
|
||||||
@getRegistryAndName(image)
|
|
||||||
.then ({ registry }) =>
|
|
||||||
dockerOptions =
|
|
||||||
authconfig:
|
|
||||||
username: 'd_' + uuid,
|
|
||||||
password: currentApiKey,
|
|
||||||
serveraddress: registry
|
|
||||||
@dockerProgress.pull(image, onProgress, dockerOptions)
|
|
||||||
.then =>
|
|
||||||
@getImage(image).inspect().get('Id')
|
|
||||||
|
|
||||||
getImageEnv: (id) ->
|
|
||||||
@getImage(id).inspect()
|
|
||||||
.get('Config').get('Env')
|
|
||||||
.then(envArrayToObject)
|
|
||||||
.catch (err) ->
|
|
||||||
console.log('Error getting env from image', err, err.stack)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
getNetworkGateway: (netName) =>
|
|
||||||
return Promise.resolve('127.0.0.1') if netName == 'host'
|
|
||||||
@getNetwork(netName).inspect()
|
|
||||||
.then (netInfo) ->
|
|
||||||
conf = netInfo?.IPAM?.Config?[0]
|
|
||||||
return conf.Gateway if conf?.Gateway?
|
|
||||||
return conf.Subnet.replace('.0/16', '.1') if _.endsWith(conf?.Subnet, '.0/16')
|
|
||||||
throw new InvalidNetGatewayError("Cannot determine network gateway for #{netName}")
|
|
41
src/lib/docker-utils.d.ts
vendored
41
src/lib/docker-utils.d.ts
vendored
@ -1,41 +0,0 @@
|
|||||||
import * as Bluebird from 'bluebird';
|
|
||||||
import DockerToolbelt = require('docker-toolbelt');
|
|
||||||
import { SchemaReturn } from '../config/schema-type';
|
|
||||||
|
|
||||||
// This is the EnvVarObject from src/lib/types, but it seems we cannot
|
|
||||||
// reference it relatively. Just redefine it as it's simple and won't change
|
|
||||||
// often
|
|
||||||
|
|
||||||
interface EnvVarObject {
|
|
||||||
[name: string]: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface TaggedRepoImage {
|
|
||||||
repo: string;
|
|
||||||
tag: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
type FetchOptions = SchemaReturn<'fetchOptions'>;
|
|
||||||
|
|
||||||
declare class DockerUtils extends DockerToolbelt {
|
|
||||||
constructor(opts: any);
|
|
||||||
|
|
||||||
getRepoAndTag(image: string): Bluebird<TaggedRepoImage>;
|
|
||||||
|
|
||||||
fetchDeltaWithProgress(
|
|
||||||
imgDest: string,
|
|
||||||
fullDeltaOpts: any,
|
|
||||||
onProgress: (args: any) => void,
|
|
||||||
): Bluebird<string>;
|
|
||||||
|
|
||||||
fetchImageWithProgress(
|
|
||||||
image: string,
|
|
||||||
config: FetchOptions,
|
|
||||||
onProgress: (args: any) => void,
|
|
||||||
): Bluebird<string>;
|
|
||||||
|
|
||||||
getImageEnv(id: string): Bluebird<EnvVarObject>;
|
|
||||||
getNetworkGateway(netName: string): Bluebird<string>;
|
|
||||||
}
|
|
||||||
|
|
||||||
export = DockerUtils;
|
|
331
src/lib/docker-utils.ts
Normal file
331
src/lib/docker-utils.ts
Normal file
@ -0,0 +1,331 @@
|
|||||||
|
import { DockerProgress, ProgressCallback } from 'docker-progress';
|
||||||
|
import * as Dockerode from 'dockerode';
|
||||||
|
import * as _ from 'lodash';
|
||||||
|
|
||||||
|
import { applyDelta, OutOfSyncError } from 'docker-delta';
|
||||||
|
import DockerToolbelt = require('docker-toolbelt');
|
||||||
|
|
||||||
|
import { SchemaReturn } from '../config/schema-type';
|
||||||
|
import { envArrayToObject } from './conversions';
|
||||||
|
import {
|
||||||
|
DeltaStillProcessingError,
|
||||||
|
InternalInconsistencyError,
|
||||||
|
InvalidNetGatewayError,
|
||||||
|
} from './errors';
|
||||||
|
import { request, requestLib, resumable } from './request';
|
||||||
|
import { EnvVarObject } from './types';
|
||||||
|
|
||||||
|
export type FetchOptions = SchemaReturn<'fetchOptions'>;
|
||||||
|
export type DeltaFetchOptions = FetchOptions & {
|
||||||
|
deltaSourceId: string;
|
||||||
|
deltaSource: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
interface RsyncApplyOptions {
|
||||||
|
timeout: number;
|
||||||
|
maxRetries: number;
|
||||||
|
retryInterval: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class DockerUtils extends DockerToolbelt {
|
||||||
|
public dockerProgress: DockerProgress;
|
||||||
|
|
||||||
|
public constructor(opts: Dockerode.DockerOptions) {
|
||||||
|
super(opts);
|
||||||
|
this.dockerProgress = new DockerProgress({ dockerToolbelt: this });
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getRepoAndTag(
|
||||||
|
image: string,
|
||||||
|
): Promise<{ repo: string; tag: string }> {
|
||||||
|
const { registry, imageName, tagName } = await this.getRegistryAndName(
|
||||||
|
image,
|
||||||
|
);
|
||||||
|
|
||||||
|
let repoName = imageName;
|
||||||
|
|
||||||
|
if (registry != null) {
|
||||||
|
repoName = `${registry}/${imageName}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { repo: repoName, tag: tagName };
|
||||||
|
}
|
||||||
|
|
||||||
|
public async fetchDeltaWithProgress(
|
||||||
|
imgDest: string,
|
||||||
|
deltaOpts: DeltaFetchOptions,
|
||||||
|
onProgress: ProgressCallback,
|
||||||
|
): Promise<string> {
|
||||||
|
const deltaSourceId =
|
||||||
|
deltaOpts.deltaSourceId != null
|
||||||
|
? deltaOpts.deltaSourceId
|
||||||
|
: deltaOpts.deltaSource;
|
||||||
|
|
||||||
|
const timeout = deltaOpts.deltaApplyTimeout;
|
||||||
|
|
||||||
|
if (timeout == null) {
|
||||||
|
throw new InternalInconsistencyError(
|
||||||
|
'A delta apply timeout is not set in fetchDeltaWithProgress!',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const log = (str: string) =>
|
||||||
|
console.log(`delta(${deltaOpts.deltaSource}): ${str}`);
|
||||||
|
|
||||||
|
if (!_.includes([2, 3], deltaOpts.deltaVersion)) {
|
||||||
|
log(
|
||||||
|
`Unsupported delta version: ${
|
||||||
|
deltaOpts.deltaVersion
|
||||||
|
}. Failling back to regular pull`,
|
||||||
|
);
|
||||||
|
return await this.fetchImageWithProgress(imgDest, deltaOpts, onProgress);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the supevisor never calls this function with a source anymore,
|
||||||
|
// this should never happen, but w ehandle it anyway
|
||||||
|
if (deltaOpts.deltaSource == null) {
|
||||||
|
log('Falling back to regular pull due to lack of a delta source');
|
||||||
|
return this.fetchImageWithProgress(imgDest, deltaOpts, onProgress);
|
||||||
|
}
|
||||||
|
|
||||||
|
const docker = this;
|
||||||
|
log(`Starting delta to ${imgDest}`);
|
||||||
|
|
||||||
|
const [dstInfo, srcInfo] = await Promise.all([
|
||||||
|
this.getRegistryAndName(imgDest),
|
||||||
|
this.getRegistryAndName(deltaOpts.deltaSource),
|
||||||
|
]);
|
||||||
|
|
||||||
|
const tokenEndpoint = `${deltaOpts.apiEndpoint}/auth/v1/token`;
|
||||||
|
const tokenOpts: requestLib.CoreOptions = {
|
||||||
|
auth: {
|
||||||
|
user: `d_${deltaOpts.uuid}`,
|
||||||
|
pass: deltaOpts.currentApiKey,
|
||||||
|
sendImmediately: true,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const tokenUrl = `${tokenEndpoint}?service=${
|
||||||
|
dstInfo.registry
|
||||||
|
}&scope=repository:${dstInfo.imageName}:pull&scope=repository:${
|
||||||
|
srcInfo.imageName
|
||||||
|
}:pull`;
|
||||||
|
|
||||||
|
const tokenResponseBody = (await request.getAsync(tokenUrl, tokenOpts))[1];
|
||||||
|
const token = tokenResponseBody != null ? tokenResponseBody.token : null;
|
||||||
|
|
||||||
|
const opts: requestLib.CoreOptions = {
|
||||||
|
followRedirect: false,
|
||||||
|
timeout: deltaOpts.deltaRequestTimeout,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (token != null) {
|
||||||
|
opts.auth = {
|
||||||
|
bearer: token,
|
||||||
|
sendImmediately: true,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const url = `${deltaOpts.deltaEndpoint}/api/v${
|
||||||
|
deltaOpts.deltaVersion
|
||||||
|
}/delta?src=${deltaOpts.deltaSource}&dest=${imgDest}`;
|
||||||
|
|
||||||
|
const [res, data] = await request.getAsync(url, opts);
|
||||||
|
if (res.statusCode === 502 || res.statusCode === 504) {
|
||||||
|
throw new DeltaStillProcessingError();
|
||||||
|
}
|
||||||
|
let id: string;
|
||||||
|
try {
|
||||||
|
switch (deltaOpts.deltaVersion) {
|
||||||
|
case 2:
|
||||||
|
if (
|
||||||
|
!(
|
||||||
|
res.statusCode >= 300 &&
|
||||||
|
res.statusCode < 400 &&
|
||||||
|
res.headers['location'] != null
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw new Error(
|
||||||
|
`Got ${res.statusCode} when request image from delta server.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const deltaUrl = res.headers['location'];
|
||||||
|
const deltaSrc = deltaSourceId;
|
||||||
|
const resumeOpts = {
|
||||||
|
timeout: deltaOpts.deltaRequestTimeout,
|
||||||
|
maxRetries: deltaOpts.deltaRetryCount,
|
||||||
|
retryInterval: deltaOpts.deltaRetryInterval,
|
||||||
|
};
|
||||||
|
id = await DockerUtils.applyRsyncDelta(
|
||||||
|
deltaSrc,
|
||||||
|
deltaUrl,
|
||||||
|
timeout,
|
||||||
|
resumeOpts,
|
||||||
|
onProgress,
|
||||||
|
log,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
if (res.statusCode !== 200) {
|
||||||
|
throw new Error(
|
||||||
|
`Got ${
|
||||||
|
res.statusCode
|
||||||
|
} when requesting v3 delta from delta server.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let name;
|
||||||
|
try {
|
||||||
|
name = JSON.parse(data).name;
|
||||||
|
} catch (e) {
|
||||||
|
throw new Error(
|
||||||
|
`Got an error when parsing delta server response for v3 delta: ${e}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
id = await DockerUtils.applyBalenaDelta(
|
||||||
|
docker,
|
||||||
|
name,
|
||||||
|
token,
|
||||||
|
onProgress,
|
||||||
|
log,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error(
|
||||||
|
`Unsupposed delta version: ${deltaOpts.deltaVersion}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof OutOfSyncError) {
|
||||||
|
log('Falling back to regular pull due to delta out of sync error');
|
||||||
|
return await this.fetchImageWithProgress(
|
||||||
|
imgDest,
|
||||||
|
deltaOpts,
|
||||||
|
onProgress,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
log(`Delta failed with ${e}`);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log(`Delta applied successfully`);
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async fetchImageWithProgress(
|
||||||
|
image: string,
|
||||||
|
{ uuid, currentApiKey }: FetchOptions,
|
||||||
|
onProgress: ProgressCallback,
|
||||||
|
): Promise<string> {
|
||||||
|
const { registry } = await this.getRegistryAndName(image);
|
||||||
|
|
||||||
|
const dockerOpts = {
|
||||||
|
authconfig: {
|
||||||
|
username: `d_${uuid}`,
|
||||||
|
password: currentApiKey,
|
||||||
|
serverAddress: registry,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.dockerProgress.pull(image, onProgress, dockerOpts);
|
||||||
|
return (await this.getImage(image).inspect()).Id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getImageEnv(id: string): Promise<EnvVarObject> {
|
||||||
|
const inspect = await this.getImage(id).inspect();
|
||||||
|
|
||||||
|
try {
|
||||||
|
return envArrayToObject(_.get(inspect, ['Config', 'Env'], []));
|
||||||
|
} catch (e) {
|
||||||
|
console.log('Error getting env from image', e, e.stack);
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getNetworkGateway(networkName: string): Promise<string> {
|
||||||
|
if (networkName === 'host') {
|
||||||
|
return '127.0.0.1';
|
||||||
|
}
|
||||||
|
|
||||||
|
const network = await this.getNetwork(networkName).inspect();
|
||||||
|
const config = _.get(network, ['IPAM', 'Config', '0']);
|
||||||
|
if (config != null) {
|
||||||
|
if (config.Gateway != null) {
|
||||||
|
return config.Gateway;
|
||||||
|
}
|
||||||
|
if (config.Subnet != null && _.endsWith(config.Subnet, '.0/16')) {
|
||||||
|
return config.Subnet.replace('.0/16', '.1');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new InvalidNetGatewayError(
|
||||||
|
`Cannot determine network gateway for ${networkName}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static applyRsyncDelta(
|
||||||
|
imgSrc: string,
|
||||||
|
deltaUrl: string,
|
||||||
|
applyTimeout: number,
|
||||||
|
opts: RsyncApplyOptions,
|
||||||
|
onProgress: ProgressCallback,
|
||||||
|
log: (str: string) => void,
|
||||||
|
): Promise<string> {
|
||||||
|
log('Applying rsync delta...');
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const req = resumable(Object.assign({ url: deltaUrl }, opts));
|
||||||
|
req
|
||||||
|
.on('progress', onProgress)
|
||||||
|
.on('retry', onProgress)
|
||||||
|
.on('error', reject)
|
||||||
|
.on('response', res => {
|
||||||
|
if (res.statusCode !== 200) {
|
||||||
|
reject(
|
||||||
|
new Error(
|
||||||
|
`Got ${res.statusCode} when requesting delta from storage.`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
} else if (parseInt(res.headers['content-length'] || '0', 10) === 0) {
|
||||||
|
reject(new Error('Invalid delta URL'));
|
||||||
|
} else {
|
||||||
|
const deltaStream = applyDelta(imgSrc, {
|
||||||
|
log,
|
||||||
|
timeout: applyTimeout,
|
||||||
|
});
|
||||||
|
res
|
||||||
|
.pipe(deltaStream)
|
||||||
|
.on('id', id => resolve(`sha256:${id}`))
|
||||||
|
.on('error', err => {
|
||||||
|
log(`Delta stream emitted error: ${err}`);
|
||||||
|
req.abort();
|
||||||
|
reject(err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async applyBalenaDelta(
|
||||||
|
docker: DockerUtils,
|
||||||
|
deltaImg: string,
|
||||||
|
token: string | null,
|
||||||
|
onProgress: ProgressCallback,
|
||||||
|
log: (str: string) => void,
|
||||||
|
): Promise<string> {
|
||||||
|
log('Applying balena delta...');
|
||||||
|
|
||||||
|
let auth: Dictionary<unknown> | undefined;
|
||||||
|
if (token != null) {
|
||||||
|
log('Using registry auth token');
|
||||||
|
auth = {
|
||||||
|
authconfig: {
|
||||||
|
registrytoken: token,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
await docker.dockerProgress.pull(deltaImg, onProgress, auth);
|
||||||
|
return (await docker.getImage(deltaImg).inspect()).Id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default DockerUtils;
|
@ -5,8 +5,8 @@ import { checkInt } from './validation';
|
|||||||
|
|
||||||
// To keep the bluebird typings happy, we need to accept
|
// To keep the bluebird typings happy, we need to accept
|
||||||
// an error, and in this case, it would also contain a status code
|
// an error, and in this case, it would also contain a status code
|
||||||
interface StatusCodeError extends Error {
|
export interface StatusCodeError extends Error {
|
||||||
statusCode?: string;
|
statusCode?: string | number;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CodedSysError extends Error {
|
interface CodedSysError extends Error {
|
||||||
|
@ -7,6 +7,8 @@ import * as osRelease from './os-release';
|
|||||||
|
|
||||||
import supervisorVersion = require('./supervisor-version');
|
import supervisorVersion = require('./supervisor-version');
|
||||||
|
|
||||||
|
export { requestLib };
|
||||||
|
|
||||||
const osVersion = osRelease.getOSVersionSync(constants.hostOSVersionPath);
|
const osVersion = osRelease.getOSVersionSync(constants.hostOSVersionPath);
|
||||||
const osVariant = osRelease.getOSVariantSync(constants.hostOSVersionPath);
|
const osVariant = osRelease.getOSVariantSync(constants.hostOSVersionPath);
|
||||||
|
|
||||||
|
@ -3,14 +3,14 @@ import * as es from 'event-stream';
|
|||||||
import * as _ from 'lodash';
|
import * as _ from 'lodash';
|
||||||
|
|
||||||
import { EventTracker } from './event-tracker';
|
import { EventTracker } from './event-tracker';
|
||||||
import Docker = require('./lib/docker-utils');
|
import Docker from './lib/docker-utils';
|
||||||
import { LogType } from './lib/log-types';
|
import { LogType } from './lib/log-types';
|
||||||
import { writeLock } from './lib/update-lock';
|
import { writeLock } from './lib/update-lock';
|
||||||
import {
|
import {
|
||||||
|
BalenaLogBackend,
|
||||||
LocalLogBackend,
|
LocalLogBackend,
|
||||||
LogBackend,
|
LogBackend,
|
||||||
LogMessage,
|
LogMessage,
|
||||||
BalenaLogBackend,
|
|
||||||
} from './logging-backends';
|
} from './logging-backends';
|
||||||
|
|
||||||
interface LoggerSetupOptions {
|
interface LoggerSetupOptions {
|
||||||
@ -134,7 +134,7 @@ export class Logger {
|
|||||||
public attach(
|
public attach(
|
||||||
docker: Docker,
|
docker: Docker,
|
||||||
containerId: string,
|
containerId: string,
|
||||||
serviceInfo: { serviceId: string; imageId: string },
|
serviceInfo: { serviceId: number; imageId: number },
|
||||||
): Bluebird<void> {
|
): Bluebird<void> {
|
||||||
return Bluebird.using(this.lock(containerId), () => {
|
return Bluebird.using(this.lock(containerId), () => {
|
||||||
return this.attachStream(
|
return this.attachStream(
|
||||||
@ -199,7 +199,7 @@ export class Logger {
|
|||||||
docker: Docker,
|
docker: Docker,
|
||||||
streamType: OutputStream,
|
streamType: OutputStream,
|
||||||
containerId: string,
|
containerId: string,
|
||||||
{ serviceId, imageId }: { serviceId: string; imageId: string },
|
{ serviceId, imageId }: { serviceId: number; imageId: number },
|
||||||
): Bluebird<void> {
|
): Bluebird<void> {
|
||||||
return Bluebird.try(() => {
|
return Bluebird.try(() => {
|
||||||
if (this.attached[streamType][containerId]) {
|
if (this.attached[streamType][containerId]) {
|
||||||
|
1
typings/JSONStream.d.ts
vendored
Normal file
1
typings/JSONStream.d.ts
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
declare module 'JSONStream';
|
12
typings/docker-delta.d.ts
vendored
Normal file
12
typings/docker-delta.d.ts
vendored
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
declare module 'docker-delta' {
|
||||||
|
// Incomplete type definitions
|
||||||
|
import TypedError = require('typed-error');
|
||||||
|
import { Duplex } from 'stream';
|
||||||
|
|
||||||
|
export class OutOfSyncError extends TypedError {}
|
||||||
|
|
||||||
|
export function applyDelta(
|
||||||
|
imageSource: string,
|
||||||
|
opts: { log: (str: string) => void; timeout: number },
|
||||||
|
): Duplex;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user