mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-04-12 21:52:57 +00:00
refactor: Convert ServiceManager to typescript
Change-type: minor Signed-off-by: Cameron Diver <cameron@balena.io>
This commit is contained in:
parent
e5893c0ee0
commit
ba000a73fc
@ -15,7 +15,7 @@ updateLock = require './lib/update-lock'
|
||||
{ NotFoundError } = require './lib/errors'
|
||||
{ pathExistsOnHost } = require './lib/fs-utils'
|
||||
|
||||
ServiceManager = require './compose/service-manager'
|
||||
{ ServiceManager } = require './compose/service-manager'
|
||||
{ Service } = require './compose/service'
|
||||
{ Images } = require './compose/images'
|
||||
{ NetworkManager } = require './compose/network-manager'
|
||||
|
2
src/application-manager.d.ts
vendored
2
src/application-manager.d.ts
vendored
@ -7,7 +7,7 @@ import { Logger } from './logger';
|
||||
import { EventTracker } from './event-tracker';
|
||||
|
||||
import Images from './compose/images';
|
||||
import ServiceManager = require('./compose/service-manager');
|
||||
import ServiceManager from './compose/service-manager';
|
||||
import DB from './db';
|
||||
|
||||
import { Service } from './compose/service';
|
||||
|
@ -1,331 +0,0 @@
|
||||
Promise = require 'bluebird'
|
||||
_ = require 'lodash'
|
||||
EventEmitter = require 'events'
|
||||
JSONStream = require 'JSONStream'
|
||||
fs = Promise.promisifyAll(require('fs'))
|
||||
|
||||
logTypes = require '../lib/log-types'
|
||||
{ checkInt, isValidDeviceName } = require '../lib/validation'
|
||||
constants = require '../lib/constants'
|
||||
|
||||
{ Service } = require './service'
|
||||
|
||||
{ NotFoundError } = require '../lib/errors'
|
||||
|
||||
module.exports = class ServiceManager extends EventEmitter
|
||||
constructor: ({ @docker, @logger, @config }) ->
|
||||
@containerHasDied = {}
|
||||
@listening = false
|
||||
# Volatile state of containers, indexed by containerId (or random strings if we don't have a containerId yet)
|
||||
@volatileState = {}
|
||||
|
||||
killAllLegacy: =>
|
||||
# Containers haven't been normalized (this is an updated supervisor)
|
||||
# so we need to stop and remove them
|
||||
@docker.getImage(constants.supervisorImage).inspect()
|
||||
.then (supervisorImage) =>
|
||||
Promise.map @docker.listContainers(all: true), (container) =>
|
||||
if container.ImageID != supervisorImage.Id
|
||||
@_killContainer(container.Id, { serviceName: 'legacy', image: container.ImageID }, { removeContainer: true })
|
||||
|
||||
reportChange: (containerId, status) ->
|
||||
if status?
|
||||
@volatileState[containerId] ?= {}
|
||||
_.merge(@volatileState[containerId], status)
|
||||
else if containerId? and @volatileState[containerId]?
|
||||
delete @volatileState[containerId]
|
||||
@emit('change')
|
||||
|
||||
reportNewStatus: (containerId, service, status) =>
|
||||
@reportChange(containerId, _.merge({ status }, _.pick(service, [ 'imageId', 'appId', 'releaseId', 'commit' ])))
|
||||
|
||||
_killContainer: (containerId, service = {}, { removeContainer = true, wait = false }) =>
|
||||
Promise.try =>
|
||||
@logger.logSystemEvent(logTypes.stopService, { service })
|
||||
if service.imageId?
|
||||
@reportNewStatus(containerId, service, 'Stopping')
|
||||
containerObj = @docker.getContainer(containerId)
|
||||
killPromise = containerObj.stop().then ->
|
||||
if removeContainer
|
||||
containerObj.remove(v: true)
|
||||
.catch (err) =>
|
||||
# Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison
|
||||
# reasons.
|
||||
statusCode = checkInt(err.statusCode)
|
||||
# 304 means the container was already stopped - so we can just remove it
|
||||
if statusCode is 304
|
||||
@logger.logSystemEvent(logTypes.stopServiceNoop, { service })
|
||||
if removeContainer
|
||||
return containerObj.remove(v: true)
|
||||
return
|
||||
# 404 means the container doesn't exist, precisely what we want! :D
|
||||
if statusCode is 404
|
||||
@logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service })
|
||||
return
|
||||
throw err
|
||||
.tap =>
|
||||
delete @containerHasDied[containerId]
|
||||
@logger.logSystemEvent(logTypes.stopServiceSuccess, { service })
|
||||
.catch (err) =>
|
||||
@logger.logSystemEvent(logTypes.stopServiceError, { service, error: err })
|
||||
.finally =>
|
||||
if service.imageId?
|
||||
@reportChange(containerId)
|
||||
if wait
|
||||
return killPromise
|
||||
return null
|
||||
|
||||
kill: (service, { removeContainer = true, wait = false } = {}) =>
|
||||
@_killContainer(service.containerId, service, { removeContainer, wait })
|
||||
|
||||
remove: (service) =>
|
||||
@logger.logSystemEvent(logTypes.removeDeadService, { service })
|
||||
@get(service)
|
||||
.then (existingService) =>
|
||||
@docker.getContainer(existingService.containerId).remove(v: true)
|
||||
.catchReturn(NotFoundError, null)
|
||||
.tapCatch (err) =>
|
||||
@logger.logSystemEvent(logTypes.removeDeadServiceError, { service, error: err })
|
||||
|
||||
getAllByAppId: (appId) =>
|
||||
@getAll("app-id=#{appId}")
|
||||
|
||||
stopAllByAppId: (appId) =>
|
||||
Promise.map @getAllByAppId(appId), (service) =>
|
||||
@kill(service, { removeContainer: false })
|
||||
|
||||
create: (service) =>
|
||||
mockContainerId = @config.newUniqueKey()
|
||||
@get(service)
|
||||
.then (existingService) =>
|
||||
return @docker.getContainer(existingService.containerId)
|
||||
.catch NotFoundError, =>
|
||||
|
||||
@config.get('name')
|
||||
.then (deviceName) =>
|
||||
if !isValidDeviceName(deviceName)
|
||||
throw new Error(
|
||||
'The device name contains a newline, which is unsupported by balena. ' +
|
||||
'Please fix the device name.'
|
||||
)
|
||||
|
||||
conf = service.toDockerContainer({ deviceName })
|
||||
nets = service.extraNetworksToJoin()
|
||||
|
||||
@logger.logSystemEvent(logTypes.installService, { service })
|
||||
@reportNewStatus(mockContainerId, service, 'Installing')
|
||||
|
||||
@docker.createContainer(conf)
|
||||
.tap (container) =>
|
||||
service.containerId = container.id
|
||||
|
||||
Promise.all _.map nets, (endpointConfig, name) =>
|
||||
@docker
|
||||
.getNetwork(name)
|
||||
.connect({ Container: container.id, EndpointConfig: endpointConfig })
|
||||
.tap =>
|
||||
@logger.logSystemEvent(logTypes.installServiceSuccess, { service })
|
||||
.tapCatch (err) =>
|
||||
@logger.logSystemEvent(logTypes.installServiceError, { service, error: err })
|
||||
.finally =>
|
||||
@reportChange(mockContainerId)
|
||||
|
||||
start: (service) =>
|
||||
alreadyStarted = false
|
||||
containerId = null
|
||||
@create(service)
|
||||
.tap (container) =>
|
||||
containerId = container.id
|
||||
@logger.logSystemEvent(logTypes.startService, { service })
|
||||
@reportNewStatus(containerId, service, 'Starting')
|
||||
container.start()
|
||||
.catch (err) =>
|
||||
statusCode = checkInt(err.statusCode)
|
||||
# 304 means the container was already started, precisely what we want :)
|
||||
if statusCode is 304
|
||||
alreadyStarted = true
|
||||
return
|
||||
|
||||
if statusCode is 500 and err.message?.trim?()?.match(/exec format error$/)
|
||||
# Provide a friendlier error message for "exec format error"
|
||||
@config.get('deviceType')
|
||||
.then (deviceType) ->
|
||||
throw new Error("Application architecture incompatible with #{deviceType}: exec format error")
|
||||
else
|
||||
# rethrow the same error
|
||||
throw err
|
||||
.tapCatch (err) =>
|
||||
# If starting the container failed, we remove it so that it doesn't litter
|
||||
container.remove(v: true)
|
||||
.catchReturn()
|
||||
.then =>
|
||||
@logger.logSystemEvent(logTypes.startServiceError, { service, error: err })
|
||||
.then =>
|
||||
@logger.attach(@docker, container.id, service)
|
||||
.tap =>
|
||||
if alreadyStarted
|
||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
||||
else
|
||||
@logger.logSystemEvent(logTypes.startServiceSuccess, { service })
|
||||
.tap ->
|
||||
service.running = true
|
||||
.finally =>
|
||||
@reportChange(containerId)
|
||||
|
||||
_listWithBothLabels: (labelList) =>
|
||||
listWithPrefix = (prefix) =>
|
||||
@docker.listContainers({ all: true, filters: label: _.map(labelList, (v) -> prefix + v) })
|
||||
Promise.join(
|
||||
listWithPrefix('io.resin.')
|
||||
listWithPrefix('io.balena.')
|
||||
(legacyContainers, currentContainers) ->
|
||||
_.unionBy(legacyContainers, currentContainers, 'Id')
|
||||
)
|
||||
|
||||
# Gets all existing containers that correspond to apps
|
||||
getAll: (extraLabelFilters = []) =>
|
||||
filterLabels = [ 'supervised' ].concat(extraLabelFilters)
|
||||
@_listWithBothLabels(filterLabels)
|
||||
.mapSeries (container) =>
|
||||
@docker.getContainer(container.Id).inspect()
|
||||
.then(Service.fromDockerContainer)
|
||||
.then (service) =>
|
||||
if @volatileState[service.containerId]?.status?
|
||||
service.status = @volatileState[service.containerId].status
|
||||
return service
|
||||
.catchReturn(NotFoundError, null)
|
||||
.filter(_.negate(_.isNil))
|
||||
|
||||
# Returns the first container matching a service definition
|
||||
get: (service) =>
|
||||
@getAll("service-id=#{service.serviceId}")
|
||||
.filter((currentService) -> currentService.isEqualConfig(service))
|
||||
.then (services) ->
|
||||
if services.length == 0
|
||||
e = new Error('Could not find a container matching this service definition')
|
||||
e.statusCode = 404
|
||||
throw e
|
||||
return services[0]
|
||||
|
||||
getStatus: =>
|
||||
@getAll()
|
||||
.then (services) =>
|
||||
status = _.clone(@volatileState)
|
||||
for service in services
|
||||
status[service.containerId] ?= _.pick(service, [
|
||||
'appId',
|
||||
'imageId',
|
||||
'status',
|
||||
'releaseId',
|
||||
'commit',
|
||||
'createdAt',
|
||||
'serviceName',
|
||||
])
|
||||
return _.values(status)
|
||||
|
||||
getByDockerContainerId: (containerId) =>
|
||||
@docker.getContainer(containerId).inspect()
|
||||
.then (container) ->
|
||||
if !(container.Config.Labels['io.balena.supervised']? or container.Config.Labels['io.resin.supervised']?)
|
||||
return null
|
||||
return Service.fromDockerContainer(container)
|
||||
|
||||
waitToKill: (service, timeout) ->
|
||||
pollInterval = 100
|
||||
timeout = checkInt(timeout, positive: true) ? 60000
|
||||
deadline = Date.now() + timeout
|
||||
|
||||
handoverCompletePaths = service.handoverCompleteFullPathsOnHost()
|
||||
|
||||
wait = ->
|
||||
Promise.any _.map handoverCompletePaths, (file) ->
|
||||
fs.statAsync(file)
|
||||
.then ->
|
||||
fs.unlinkAsync(file).catch(_.noop)
|
||||
.catch ->
|
||||
if Date.now() < deadline
|
||||
Promise.delay(pollInterval).then(wait)
|
||||
else
|
||||
console.log('Handover timeout has passed, assuming handover was completed')
|
||||
|
||||
console.log('Waiting for handover to be completed')
|
||||
wait()
|
||||
.then ->
|
||||
console.log('Handover complete')
|
||||
|
||||
prepareForHandover: (service) =>
|
||||
@get(service)
|
||||
.then (svc) =>
|
||||
container = @docker.getContainer(svc.containerId)
|
||||
container.update(RestartPolicy: {})
|
||||
.then ->
|
||||
container.rename(name: "old_#{service.serviceName}_#{service.imageId}_#{service.releaseId}")
|
||||
|
||||
updateMetadata: (service, { imageId, releaseId }) =>
|
||||
@get(service)
|
||||
.then (svc) =>
|
||||
@docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{imageId}_#{releaseId}")
|
||||
.then =>
|
||||
@reportChange()
|
||||
|
||||
handover: (currentService, targetService) =>
|
||||
# We set the running container to not restart so that in case of a poweroff
|
||||
# it doesn't come back after boot.
|
||||
@prepareForHandover(currentService)
|
||||
.then =>
|
||||
@start(targetService)
|
||||
.then =>
|
||||
@waitToKill(currentService, targetService.config.labels['io.balena.update.handover-timeout'])
|
||||
.then =>
|
||||
@kill(currentService)
|
||||
|
||||
listenToEvents: =>
|
||||
if @listening
|
||||
return
|
||||
@listening = true
|
||||
@docker.getEvents(filters: type: [ 'container' ])
|
||||
.then (stream) =>
|
||||
stream.on 'error', (err) ->
|
||||
console.error('Error on docker events stream:', err)
|
||||
parser = JSONStream.parse()
|
||||
parser.on 'data', (data) =>
|
||||
if data?.status in ['die', 'start']
|
||||
@getByDockerContainerId(data.id)
|
||||
.catchReturn(NotFoundError, null)
|
||||
.then (service) =>
|
||||
if service?
|
||||
if data.status == 'die'
|
||||
@emit('change')
|
||||
@logger.logSystemEvent(logTypes.serviceExit, { service })
|
||||
@containerHasDied[data.id] = true
|
||||
else if data.status == 'start' and @containerHasDied[data.id]
|
||||
@emit('change')
|
||||
delete @containerHasDied[data.id]
|
||||
@logger.logSystemEvent(logTypes.serviceRestart, { service })
|
||||
@logger.attach(@docker, data.id, service)
|
||||
.catch (err) ->
|
||||
console.error('Error on docker event:', err, err.stack)
|
||||
return
|
||||
new Promise (resolve, reject) ->
|
||||
parser
|
||||
.on 'error', (err) ->
|
||||
console.error('Error on docker events stream', err)
|
||||
reject(err)
|
||||
.on 'end', ->
|
||||
resolve()
|
||||
stream.pipe(parser)
|
||||
.catch (err) ->
|
||||
console.error('Error listening to events:', err, err.stack)
|
||||
.finally =>
|
||||
@listening = false
|
||||
setTimeout =>
|
||||
@listenToEvents()
|
||||
, 1000
|
||||
return
|
||||
|
||||
attachToRunning: =>
|
||||
@getAll()
|
||||
.map (service) =>
|
||||
if service.status == 'Running'
|
||||
@logger.logSystemEvent(logTypes.startServiceNoop, { service })
|
||||
@logger.attach(@docker, service.containerId, service)
|
12
src/compose/service-manager.d.ts
vendored
12
src/compose/service-manager.d.ts
vendored
@ -1,12 +0,0 @@
|
||||
import * as Bluebird from 'bluebird';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
import { Service } from '../compose/service';
|
||||
|
||||
// FIXME: Unfinished definition for this class...
|
||||
declare class ServiceManager extends EventEmitter {
|
||||
public getStatus(): Service[];
|
||||
public getAll(): Bluebird<Service[]>;
|
||||
}
|
||||
|
||||
export = ServiceManager;
|
660
src/compose/service-manager.ts
Normal file
660
src/compose/service-manager.ts
Normal file
@ -0,0 +1,660 @@
|
||||
import * as Bluebird from 'bluebird';
|
||||
import * as Dockerode from 'dockerode';
|
||||
import { EventEmitter } from 'events';
|
||||
import * as JSONStream from 'JSONStream';
|
||||
import * as _ from 'lodash';
|
||||
import { fs } from 'mz';
|
||||
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||
|
||||
import Config from '../config';
|
||||
import Docker from '../lib/docker-utils';
|
||||
import Logger from '../logger';
|
||||
|
||||
import { PermissiveNumber } from '../config/types';
|
||||
import constants = require('../lib/constants');
|
||||
import {
|
||||
InternalInconsistencyError,
|
||||
NotFoundError,
|
||||
StatusCodeError,
|
||||
} from '../lib/errors';
|
||||
import * as LogTypes from '../lib/log-types';
|
||||
import { checkInt, isValidDeviceName } from '../lib/validation';
|
||||
import { Service } from './service';
|
||||
|
||||
interface ServiceConstructOpts {
|
||||
docker: Docker;
|
||||
logger: Logger;
|
||||
config: Config;
|
||||
}
|
||||
|
||||
interface ServiceManagerEvents {
|
||||
change: void;
|
||||
}
|
||||
type ServiceManagerEventEmitter = StrictEventEmitter<
|
||||
EventEmitter,
|
||||
ServiceManagerEvents
|
||||
>;
|
||||
|
||||
interface KillOpts {
|
||||
removeContainer?: boolean;
|
||||
wait?: boolean;
|
||||
}
|
||||
|
||||
export class ServiceManager extends (EventEmitter as {
|
||||
new (): ServiceManagerEventEmitter;
|
||||
}) {
|
||||
private docker: Docker;
|
||||
private logger: Logger;
|
||||
private config: Config;
|
||||
|
||||
// Whether a container has died, indexed by ID
|
||||
private containerHasDied: Dictionary<boolean> = {};
|
||||
private listening = false;
|
||||
// Volatile state of containers, indexed by containerId (or random strings if
|
||||
// we don't yet have an id)
|
||||
private volatileState: Dictionary<Partial<Service>> = {};
|
||||
|
||||
public constructor(opts: ServiceConstructOpts) {
|
||||
super();
|
||||
this.docker = opts.docker;
|
||||
this.logger = opts.logger;
|
||||
this.config = opts.config;
|
||||
}
|
||||
|
||||
public async getAll(
|
||||
extraLabelFilters: string | string[] = [],
|
||||
): Promise<Service[]> {
|
||||
const filterLabels = ['supervised'].concat(extraLabelFilters);
|
||||
const containers = await this.listWithBothLabels(filterLabels);
|
||||
|
||||
const services = await Bluebird.map(containers, async container => {
|
||||
try {
|
||||
const serviceInspect = await this.docker
|
||||
.getContainer(container.Id)
|
||||
.inspect();
|
||||
const service = Service.fromDockerContainer(serviceInspect);
|
||||
// We know that the containerId is set below, because `fromDockerContainer`
|
||||
// always sets it
|
||||
const vState = this.volatileState[service.containerId!];
|
||||
if (vState != null && vState.status != null) {
|
||||
service.status = vState.status;
|
||||
}
|
||||
return service;
|
||||
} catch (e) {
|
||||
if (NotFoundError(e)) {
|
||||
return null;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
|
||||
return services.filter(s => s != null) as Service[];
|
||||
}
|
||||
|
||||
public async get(service: Service) {
|
||||
const services = (await this.getAll(
|
||||
`service-id=${service.serviceId}`,
|
||||
)).filter(currentService => currentService.isEqualConfig(service));
|
||||
|
||||
if (services.length === 0) {
|
||||
const e: StatusCodeError = new Error(
|
||||
'Could not find a container matching this service definition',
|
||||
);
|
||||
e.statusCode = 404;
|
||||
throw e;
|
||||
}
|
||||
return services[0];
|
||||
}
|
||||
|
||||
public async getStatus() {
|
||||
const services = await this.getAll();
|
||||
const status = _.clone(this.volatileState);
|
||||
|
||||
for (const service of services) {
|
||||
if (service.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`containerId not defined in ServiceManager.getStatus: ${service}`,
|
||||
);
|
||||
}
|
||||
if (status[service.containerId] == null) {
|
||||
status[service.containerId] = _.pick(service, [
|
||||
'appId',
|
||||
'imageId',
|
||||
'status',
|
||||
'releaseId',
|
||||
'commit',
|
||||
'createdAt',
|
||||
'serviceName',
|
||||
]) as Partial<Service>;
|
||||
}
|
||||
}
|
||||
|
||||
return _.values(status);
|
||||
}
|
||||
|
||||
public async getByDockerContainerId(
|
||||
containerId: string,
|
||||
): Promise<Service | null> {
|
||||
const container = await this.docker.getContainer(containerId).inspect();
|
||||
if (
|
||||
container.Config.Labels['io.balena.supervised'] == null &&
|
||||
container.Config.Labels['io.resin.supervised'] == null
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return Service.fromDockerContainer(container);
|
||||
}
|
||||
|
||||
public async updateMetadata(
|
||||
service: Service,
|
||||
metadata: { imageId: number; releaseId: number },
|
||||
) {
|
||||
const svc = await this.get(service);
|
||||
if (svc.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No containerId provided for service ${
|
||||
service.serviceName
|
||||
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||
);
|
||||
}
|
||||
|
||||
await this.docker.getContainer(svc.containerId).rename({
|
||||
name: `${service.serviceName}_${metadata.imageId}_${metadata.releaseId}`,
|
||||
});
|
||||
}
|
||||
|
||||
public async handover(current: Service, target: Service) {
|
||||
// We set the running container to not restart so that in case of a poweroff
|
||||
// it doesn't come back after boot.
|
||||
await this.prepareForHandover(current);
|
||||
await this.start(target);
|
||||
await this.waitToKill(
|
||||
current,
|
||||
target.config.labels['io.balena.update.handover-timeout'],
|
||||
);
|
||||
await this.kill(current);
|
||||
}
|
||||
|
||||
public async killAllLegacy(): Promise<void> {
|
||||
// Containers haven't been normalized (this is an updated supervisor)
|
||||
// so we need to stop and remove them
|
||||
const supervisorImageId = (await this.docker
|
||||
.getImage(constants.supervisorImage)
|
||||
.inspect()).Id;
|
||||
|
||||
for (const container of await this.docker.listContainers({ all: true })) {
|
||||
if (container.ImageID !== supervisorImageId) {
|
||||
await this.killContainer(container.Id, {
|
||||
serviceName: 'legacy',
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public kill(service: Service, opts: KillOpts = {}) {
|
||||
if (service.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`Attempt to kill container without containerId! Service :${service}`,
|
||||
);
|
||||
}
|
||||
return this.killContainer(service.containerId, service, opts);
|
||||
}
|
||||
|
||||
public async remove(service: Service) {
|
||||
this.logger.logSystemEvent(LogTypes.removeDeadService, { service });
|
||||
const existingService = await this.get(service);
|
||||
|
||||
if (existingService.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No containerId provided for service ${
|
||||
service.serviceName
|
||||
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.docker
|
||||
.getContainer(existingService.containerId)
|
||||
.remove({ v: true });
|
||||
} catch (e) {
|
||||
if (!NotFoundError(e)) {
|
||||
this.logger.logSystemEvent(LogTypes.removeDeadServiceError, {
|
||||
service,
|
||||
error: e,
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async create(service: Service) {
|
||||
const mockContainerId = this.config.newUniqueKey();
|
||||
try {
|
||||
const existing = await this.get(service);
|
||||
if (existing.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No containerId provided for service ${
|
||||
service.serviceName
|
||||
} in ServiceManager.updateMetadata. Service: ${service}`,
|
||||
);
|
||||
}
|
||||
return this.docker.getContainer(existing.containerId);
|
||||
} catch (e) {
|
||||
if (!NotFoundError(e)) {
|
||||
this.logger.logSystemEvent(LogTypes.installServiceError, {
|
||||
service,
|
||||
error: e,
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
|
||||
const deviceName = await this.config.get('name');
|
||||
if (!isValidDeviceName(deviceName)) {
|
||||
throw new Error(
|
||||
'The device name contains a newline, which is unsupported by balena. ' +
|
||||
'Please fix the device name',
|
||||
);
|
||||
}
|
||||
|
||||
const conf = service.toDockerContainer({ deviceName });
|
||||
const nets = service.extraNetworksToJoin();
|
||||
|
||||
this.logger.logSystemEvent(LogTypes.installService, { service });
|
||||
this.reportNewStatus(mockContainerId, service, 'Installing');
|
||||
|
||||
const container = await this.docker.createContainer(conf);
|
||||
service.containerId = container.id;
|
||||
|
||||
await Promise.all(
|
||||
_.map(nets, (endpointConfig, name) =>
|
||||
this.docker.getNetwork(name).connect({
|
||||
Container: container.id,
|
||||
EndpointConfig: endpointConfig,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
this.logger.logSystemEvent(LogTypes.installServiceSuccess, { service });
|
||||
return container;
|
||||
} finally {
|
||||
this.reportChange(mockContainerId);
|
||||
}
|
||||
}
|
||||
|
||||
public async start(service: Service) {
|
||||
let alreadyStarted = false;
|
||||
let containerId: string | null = null;
|
||||
|
||||
try {
|
||||
const container = await this.create(service);
|
||||
containerId = container.id;
|
||||
this.logger.logSystemEvent(LogTypes.startService, { service });
|
||||
|
||||
this.reportNewStatus(containerId, service, 'Starting');
|
||||
|
||||
let remove = false;
|
||||
let err: Error | undefined;
|
||||
try {
|
||||
await container.start();
|
||||
} catch (e) {
|
||||
// Get the statusCode from the original cause and make sure it's
|
||||
// definitely an int for comparison reasons
|
||||
const maybeStatusCode = PermissiveNumber.decode(e.statusCode);
|
||||
if (maybeStatusCode.isLeft()) {
|
||||
remove = true;
|
||||
err = new Error(
|
||||
`Could not parse status code from docker error: ${e}`,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
const statusCode = maybeStatusCode.value;
|
||||
const message = e.message;
|
||||
|
||||
// 304 means the container was already started, precisely what we want
|
||||
if (statusCode === 304) {
|
||||
alreadyStarted = true;
|
||||
} else if (
|
||||
statusCode === 500 &&
|
||||
_.isString(message) &&
|
||||
message.trim().match(/exec format error$/)
|
||||
) {
|
||||
// Provide a friendlier error message for "exec format error"
|
||||
const deviceType = await this.config.get('deviceType');
|
||||
err = new Error(
|
||||
`Application architecture incompatible with ${deviceType}: exec format error`,
|
||||
);
|
||||
throw err;
|
||||
} else {
|
||||
// rethrow the same error
|
||||
err = e;
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
if (remove) {
|
||||
// If starting the container fialed, we remove it so that it doesn't litter
|
||||
await container.remove({ v: true }).catch(_.noop);
|
||||
this.logger.logSystemEvent(LogTypes.startServiceError, {
|
||||
service,
|
||||
error: err,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const serviceId = service.serviceId;
|
||||
const imageId = service.imageId;
|
||||
if (serviceId == null || imageId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`serviceId and imageId not defined for service: ${
|
||||
service.serviceName
|
||||
} in ServiceManager.start`,
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.attach(this.docker, container.id, { serviceId, imageId });
|
||||
|
||||
if (alreadyStarted) {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||
} else {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceSuccess, { service });
|
||||
}
|
||||
|
||||
service.config.running = true;
|
||||
return container;
|
||||
} finally {
|
||||
if (containerId != null) {
|
||||
this.reportChange(containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public listenToEvents() {
|
||||
if (this.listening) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.listening = true;
|
||||
|
||||
const listen = async () => {
|
||||
const stream = await this.docker.getEvents({
|
||||
filters: { type: ['container'] },
|
||||
});
|
||||
|
||||
stream.on('error', e => {
|
||||
console.error(`Error on docker events stream:`, e);
|
||||
});
|
||||
const parser = JSONStream.parse();
|
||||
parser.on('data', async (data: { status: string; id: string }) => {
|
||||
if (data != null) {
|
||||
const status = data.status;
|
||||
if (status === 'die' || status === 'start') {
|
||||
try {
|
||||
let service: Service | null = null;
|
||||
try {
|
||||
service = await this.getByDockerContainerId(data.id);
|
||||
} catch (e) {
|
||||
if (!NotFoundError(e)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (service != null) {
|
||||
this.emit('change');
|
||||
if (status === 'die') {
|
||||
this.logger.logSystemEvent(LogTypes.serviceExit, { service });
|
||||
this.containerHasDied[data.id] = true;
|
||||
} else if (
|
||||
status === 'start' &&
|
||||
this.containerHasDied[data.id]
|
||||
) {
|
||||
delete this.containerHasDied[data.id];
|
||||
this.logger.logSystemEvent(LogTypes.serviceRestart, {
|
||||
service,
|
||||
});
|
||||
|
||||
const serviceId = service.serviceId;
|
||||
const imageId = service.imageId;
|
||||
if (serviceId == null || imageId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`serviceId and imageId not defined for service: ${
|
||||
service.serviceName
|
||||
} in ServiceManager.listenToEvents`,
|
||||
);
|
||||
}
|
||||
this.logger.attach(this.docker, data.id, {
|
||||
serviceId,
|
||||
imageId,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error on docker event:', e, e.stack);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
parser
|
||||
.on('error', (e: Error) => {
|
||||
console.error('Error on docker events stream:', e);
|
||||
reject(e);
|
||||
})
|
||||
.on('end', resolve);
|
||||
stream.pipe(parser);
|
||||
});
|
||||
};
|
||||
|
||||
Bluebird.resolve(listen())
|
||||
.catch(e => {
|
||||
console.error('Error listening to events:', e, e.stack);
|
||||
})
|
||||
.finally(() => {
|
||||
this.listening = false;
|
||||
setTimeout(() => this.listenToEvents(), 1000);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
public async attachToRunning() {
|
||||
const services = await this.getAll();
|
||||
for (const service of services) {
|
||||
if (service.status === 'Running') {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||
|
||||
const serviceId = service.serviceId;
|
||||
const imageId = service.imageId;
|
||||
if (serviceId == null || imageId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`serviceId and imageId not defined for service: ${
|
||||
service.serviceName
|
||||
} in ServiceManager.start`,
|
||||
);
|
||||
}
|
||||
|
||||
if (service.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`containerId not defined for service: ${
|
||||
service.serviceName
|
||||
} in ServiceManager.attachToRunning`,
|
||||
);
|
||||
}
|
||||
this.logger.attach(this.docker, service.containerId, {
|
||||
serviceId,
|
||||
imageId,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private reportChange(containerId?: string, status?: Partial<Service>) {
|
||||
if (containerId != null) {
|
||||
if (status != null) {
|
||||
this.volatileState[containerId] = {};
|
||||
_.merge(this.volatileState[containerId], status);
|
||||
} else if (this.volatileState[containerId] != null) {
|
||||
delete this.volatileState[containerId];
|
||||
}
|
||||
}
|
||||
this.emit('change');
|
||||
}
|
||||
|
||||
private reportNewStatus(
|
||||
containerId: string,
|
||||
service: Partial<Service>,
|
||||
status: string,
|
||||
) {
|
||||
this.reportChange(
|
||||
containerId,
|
||||
_.merge(
|
||||
{ status },
|
||||
_.pick(service, ['imageId', 'appId', 'releaseId', 'commit']),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
private killContainer(
|
||||
containerId: string,
|
||||
service: Partial<Service> = {},
|
||||
{ removeContainer = true, wait = false }: KillOpts = {},
|
||||
): Bluebird<void> {
|
||||
// To maintain compatibility of the `wait` flag, this function is not
|
||||
// async, but it feels like whether or not the promise should be waited on
|
||||
// should performed by the caller
|
||||
// TODO: Remove the need for the wait flag
|
||||
|
||||
return Bluebird.try(() => {
|
||||
this.logger.logSystemEvent(LogTypes.stopService, { service });
|
||||
if (service.imageId != null) {
|
||||
this.reportNewStatus(containerId, service, 'Stopping');
|
||||
}
|
||||
|
||||
const containerObj = this.docker.getContainer(containerId);
|
||||
const killPromise = Bluebird.resolve(containerObj.stop())
|
||||
.then(() => {
|
||||
if (removeContainer) {
|
||||
return containerObj.remove({ v: true });
|
||||
}
|
||||
})
|
||||
.catch(e => {
|
||||
// Get the statusCode from the original cause and make sure it's
|
||||
// definitely an int for comparison reasons
|
||||
const maybeStatusCode = PermissiveNumber.decode(e.statusCode);
|
||||
if (maybeStatusCode.isLeft()) {
|
||||
throw new Error(
|
||||
`Could not parse status code from docker error: ${e}`,
|
||||
);
|
||||
}
|
||||
const statusCode = maybeStatusCode.value;
|
||||
|
||||
// 304 means the container was already stopped, so we can just remove it
|
||||
if (statusCode === 304) {
|
||||
this.logger.logSystemEvent(LogTypes.stopServiceNoop, { service });
|
||||
// Why do we attempt to remove the container again?
|
||||
if (removeContainer) {
|
||||
return containerObj.remove({ v: true });
|
||||
}
|
||||
} else if (statusCode === 404) {
|
||||
// 404 means the container doesn't exist, precisely what we want!
|
||||
this.logger.logSystemEvent(LogTypes.stopRemoveServiceNoop, {
|
||||
service,
|
||||
});
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
})
|
||||
.tap(() => {
|
||||
delete this.containerHasDied[containerId];
|
||||
this.logger.logSystemEvent(LogTypes.stopServiceSuccess, { service });
|
||||
})
|
||||
.catch(e => {
|
||||
this.logger.logSystemEvent(LogTypes.stopServiceError, {
|
||||
service,
|
||||
error: e,
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
if (service.imageId != null) {
|
||||
this.reportChange(containerId);
|
||||
}
|
||||
});
|
||||
|
||||
if (wait) {
|
||||
return killPromise;
|
||||
}
|
||||
return;
|
||||
});
|
||||
}
|
||||
|
||||
private async listWithBothLabels(
|
||||
labelList: string[],
|
||||
): Promise<Dockerode.ContainerInfo[]> {
|
||||
const listWithPrefix = (prefix: string) =>
|
||||
this.docker.listContainers({
|
||||
all: true,
|
||||
filters: {
|
||||
label: _.map(labelList, v => `${prefix}${v}`),
|
||||
},
|
||||
});
|
||||
|
||||
const [legacy, current] = await Promise.all([
|
||||
listWithPrefix('io.resin.'),
|
||||
listWithPrefix('io.balena.'),
|
||||
]);
|
||||
|
||||
return _.unionBy(legacy, current, 'Id');
|
||||
}
|
||||
|
||||
private async prepareForHandover(service: Service) {
|
||||
const svc = await this.get(service);
|
||||
if (svc.containerId == null) {
|
||||
throw new InternalInconsistencyError(
|
||||
`No containerId provided for service ${
|
||||
service.serviceName
|
||||
} in ServiceManager.prepareForHandover. Service: ${service}`,
|
||||
);
|
||||
}
|
||||
const container = this.docker.getContainer(svc.containerId);
|
||||
await container.update({ RestartPolicy: {} });
|
||||
return await container.rename({
|
||||
name: `old_${service.serviceName}_${service.imageId}_${service.imageId}_${
|
||||
service.releaseId
|
||||
}`,
|
||||
});
|
||||
}
|
||||
|
||||
private waitToKill(service: Service, timeout: number | string) {
|
||||
const pollInterval = 100;
|
||||
timeout = checkInt(timeout, { positive: true }) || 60000;
|
||||
const deadline = Date.now() + timeout;
|
||||
|
||||
const handoverCompletePaths = service.handoverCompleteFullPathsOnHost();
|
||||
|
||||
const wait = (): Bluebird<void> =>
|
||||
Bluebird.any(
|
||||
handoverCompletePaths.map(file =>
|
||||
fs.stat(file).then(() => fs.unlink(file).catch(_.noop)),
|
||||
),
|
||||
).catch(async () => {
|
||||
if (Date.now() < deadline) {
|
||||
await Bluebird.delay(pollInterval);
|
||||
return wait();
|
||||
} else {
|
||||
console.log(
|
||||
`Handover timeout has passed, assuming handover was completed for service ${
|
||||
service.serviceName
|
||||
}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
console.log(
|
||||
`Waiting for handover to be completed for service: ${
|
||||
service.serviceName
|
||||
}`,
|
||||
);
|
||||
|
||||
return wait().then(() => {
|
||||
console.log(`Handover complete for service ${service.serviceName}`);
|
||||
});
|
||||
}
|
||||
}
|
1
typings/JSONStream.d.ts
vendored
Normal file
1
typings/JSONStream.d.ts
vendored
Normal file
@ -0,0 +1 @@
|
||||
declare module 'JSONStream';
|
Loading…
x
Reference in New Issue
Block a user