From ba000a73fc1873c87eb7f6003ca0ab961704184a Mon Sep 17 00:00:00 2001 From: Cameron Diver Date: Thu, 24 Jan 2019 16:52:22 +0000 Subject: [PATCH] refactor: Convert ServiceManager to typescript Change-type: minor Signed-off-by: Cameron Diver --- src/application-manager.coffee | 2 +- src/application-manager.d.ts | 2 +- src/compose/service-manager.coffee | 331 --------------- src/compose/service-manager.d.ts | 12 - src/compose/service-manager.ts | 660 +++++++++++++++++++++++++++++ typings/JSONStream.d.ts | 1 + 6 files changed, 663 insertions(+), 345 deletions(-) delete mode 100644 src/compose/service-manager.coffee delete mode 100644 src/compose/service-manager.d.ts create mode 100644 src/compose/service-manager.ts create mode 100644 typings/JSONStream.d.ts diff --git a/src/application-manager.coffee b/src/application-manager.coffee index 9bed15a6..e5ee1a16 100644 --- a/src/application-manager.coffee +++ b/src/application-manager.coffee @@ -15,7 +15,7 @@ updateLock = require './lib/update-lock' { NotFoundError } = require './lib/errors' { pathExistsOnHost } = require './lib/fs-utils' -ServiceManager = require './compose/service-manager' +{ ServiceManager } = require './compose/service-manager' { Service } = require './compose/service' { Images } = require './compose/images' { NetworkManager } = require './compose/network-manager' diff --git a/src/application-manager.d.ts b/src/application-manager.d.ts index 976c0dc4..961baef5 100644 --- a/src/application-manager.d.ts +++ b/src/application-manager.d.ts @@ -7,7 +7,7 @@ import { Logger } from './logger'; import { EventTracker } from './event-tracker'; import Images from './compose/images'; -import ServiceManager = require('./compose/service-manager'); +import ServiceManager from './compose/service-manager'; import DB from './db'; import { Service } from './compose/service'; diff --git a/src/compose/service-manager.coffee b/src/compose/service-manager.coffee deleted file mode 100644 index 36f9c142..00000000 --- a/src/compose/service-manager.coffee +++ /dev/null @@ -1,331 +0,0 @@ -Promise = require 'bluebird' -_ = require 'lodash' -EventEmitter = require 'events' -JSONStream = require 'JSONStream' -fs = Promise.promisifyAll(require('fs')) - -logTypes = require '../lib/log-types' -{ checkInt, isValidDeviceName } = require '../lib/validation' -constants = require '../lib/constants' - -{ Service } = require './service' - -{ NotFoundError } = require '../lib/errors' - -module.exports = class ServiceManager extends EventEmitter - constructor: ({ @docker, @logger, @config }) -> - @containerHasDied = {} - @listening = false - # Volatile state of containers, indexed by containerId (or random strings if we don't have a containerId yet) - @volatileState = {} - - killAllLegacy: => - # Containers haven't been normalized (this is an updated supervisor) - # so we need to stop and remove them - @docker.getImage(constants.supervisorImage).inspect() - .then (supervisorImage) => - Promise.map @docker.listContainers(all: true), (container) => - if container.ImageID != supervisorImage.Id - @_killContainer(container.Id, { serviceName: 'legacy', image: container.ImageID }, { removeContainer: true }) - - reportChange: (containerId, status) -> - if status? - @volatileState[containerId] ?= {} - _.merge(@volatileState[containerId], status) - else if containerId? and @volatileState[containerId]? - delete @volatileState[containerId] - @emit('change') - - reportNewStatus: (containerId, service, status) => - @reportChange(containerId, _.merge({ status }, _.pick(service, [ 'imageId', 'appId', 'releaseId', 'commit' ]))) - - _killContainer: (containerId, service = {}, { removeContainer = true, wait = false }) => - Promise.try => - @logger.logSystemEvent(logTypes.stopService, { service }) - if service.imageId? - @reportNewStatus(containerId, service, 'Stopping') - containerObj = @docker.getContainer(containerId) - killPromise = containerObj.stop().then -> - if removeContainer - containerObj.remove(v: true) - .catch (err) => - # Get the statusCode from the original cause and make sure statusCode it's definitely a string for comparison - # reasons. - statusCode = checkInt(err.statusCode) - # 304 means the container was already stopped - so we can just remove it - if statusCode is 304 - @logger.logSystemEvent(logTypes.stopServiceNoop, { service }) - if removeContainer - return containerObj.remove(v: true) - return - # 404 means the container doesn't exist, precisely what we want! :D - if statusCode is 404 - @logger.logSystemEvent(logTypes.stopRemoveServiceNoop, { service }) - return - throw err - .tap => - delete @containerHasDied[containerId] - @logger.logSystemEvent(logTypes.stopServiceSuccess, { service }) - .catch (err) => - @logger.logSystemEvent(logTypes.stopServiceError, { service, error: err }) - .finally => - if service.imageId? - @reportChange(containerId) - if wait - return killPromise - return null - - kill: (service, { removeContainer = true, wait = false } = {}) => - @_killContainer(service.containerId, service, { removeContainer, wait }) - - remove: (service) => - @logger.logSystemEvent(logTypes.removeDeadService, { service }) - @get(service) - .then (existingService) => - @docker.getContainer(existingService.containerId).remove(v: true) - .catchReturn(NotFoundError, null) - .tapCatch (err) => - @logger.logSystemEvent(logTypes.removeDeadServiceError, { service, error: err }) - - getAllByAppId: (appId) => - @getAll("app-id=#{appId}") - - stopAllByAppId: (appId) => - Promise.map @getAllByAppId(appId), (service) => - @kill(service, { removeContainer: false }) - - create: (service) => - mockContainerId = @config.newUniqueKey() - @get(service) - .then (existingService) => - return @docker.getContainer(existingService.containerId) - .catch NotFoundError, => - - @config.get('name') - .then (deviceName) => - if !isValidDeviceName(deviceName) - throw new Error( - 'The device name contains a newline, which is unsupported by balena. ' + - 'Please fix the device name.' - ) - - conf = service.toDockerContainer({ deviceName }) - nets = service.extraNetworksToJoin() - - @logger.logSystemEvent(logTypes.installService, { service }) - @reportNewStatus(mockContainerId, service, 'Installing') - - @docker.createContainer(conf) - .tap (container) => - service.containerId = container.id - - Promise.all _.map nets, (endpointConfig, name) => - @docker - .getNetwork(name) - .connect({ Container: container.id, EndpointConfig: endpointConfig }) - .tap => - @logger.logSystemEvent(logTypes.installServiceSuccess, { service }) - .tapCatch (err) => - @logger.logSystemEvent(logTypes.installServiceError, { service, error: err }) - .finally => - @reportChange(mockContainerId) - - start: (service) => - alreadyStarted = false - containerId = null - @create(service) - .tap (container) => - containerId = container.id - @logger.logSystemEvent(logTypes.startService, { service }) - @reportNewStatus(containerId, service, 'Starting') - container.start() - .catch (err) => - statusCode = checkInt(err.statusCode) - # 304 means the container was already started, precisely what we want :) - if statusCode is 304 - alreadyStarted = true - return - - if statusCode is 500 and err.message?.trim?()?.match(/exec format error$/) - # Provide a friendlier error message for "exec format error" - @config.get('deviceType') - .then (deviceType) -> - throw new Error("Application architecture incompatible with #{deviceType}: exec format error") - else - # rethrow the same error - throw err - .tapCatch (err) => - # If starting the container failed, we remove it so that it doesn't litter - container.remove(v: true) - .catchReturn() - .then => - @logger.logSystemEvent(logTypes.startServiceError, { service, error: err }) - .then => - @logger.attach(@docker, container.id, service) - .tap => - if alreadyStarted - @logger.logSystemEvent(logTypes.startServiceNoop, { service }) - else - @logger.logSystemEvent(logTypes.startServiceSuccess, { service }) - .tap -> - service.running = true - .finally => - @reportChange(containerId) - - _listWithBothLabels: (labelList) => - listWithPrefix = (prefix) => - @docker.listContainers({ all: true, filters: label: _.map(labelList, (v) -> prefix + v) }) - Promise.join( - listWithPrefix('io.resin.') - listWithPrefix('io.balena.') - (legacyContainers, currentContainers) -> - _.unionBy(legacyContainers, currentContainers, 'Id') - ) - - # Gets all existing containers that correspond to apps - getAll: (extraLabelFilters = []) => - filterLabels = [ 'supervised' ].concat(extraLabelFilters) - @_listWithBothLabels(filterLabels) - .mapSeries (container) => - @docker.getContainer(container.Id).inspect() - .then(Service.fromDockerContainer) - .then (service) => - if @volatileState[service.containerId]?.status? - service.status = @volatileState[service.containerId].status - return service - .catchReturn(NotFoundError, null) - .filter(_.negate(_.isNil)) - - # Returns the first container matching a service definition - get: (service) => - @getAll("service-id=#{service.serviceId}") - .filter((currentService) -> currentService.isEqualConfig(service)) - .then (services) -> - if services.length == 0 - e = new Error('Could not find a container matching this service definition') - e.statusCode = 404 - throw e - return services[0] - - getStatus: => - @getAll() - .then (services) => - status = _.clone(@volatileState) - for service in services - status[service.containerId] ?= _.pick(service, [ - 'appId', - 'imageId', - 'status', - 'releaseId', - 'commit', - 'createdAt', - 'serviceName', - ]) - return _.values(status) - - getByDockerContainerId: (containerId) => - @docker.getContainer(containerId).inspect() - .then (container) -> - if !(container.Config.Labels['io.balena.supervised']? or container.Config.Labels['io.resin.supervised']?) - return null - return Service.fromDockerContainer(container) - - waitToKill: (service, timeout) -> - pollInterval = 100 - timeout = checkInt(timeout, positive: true) ? 60000 - deadline = Date.now() + timeout - - handoverCompletePaths = service.handoverCompleteFullPathsOnHost() - - wait = -> - Promise.any _.map handoverCompletePaths, (file) -> - fs.statAsync(file) - .then -> - fs.unlinkAsync(file).catch(_.noop) - .catch -> - if Date.now() < deadline - Promise.delay(pollInterval).then(wait) - else - console.log('Handover timeout has passed, assuming handover was completed') - - console.log('Waiting for handover to be completed') - wait() - .then -> - console.log('Handover complete') - - prepareForHandover: (service) => - @get(service) - .then (svc) => - container = @docker.getContainer(svc.containerId) - container.update(RestartPolicy: {}) - .then -> - container.rename(name: "old_#{service.serviceName}_#{service.imageId}_#{service.releaseId}") - - updateMetadata: (service, { imageId, releaseId }) => - @get(service) - .then (svc) => - @docker.getContainer(svc.containerId).rename(name: "#{service.serviceName}_#{imageId}_#{releaseId}") - .then => - @reportChange() - - handover: (currentService, targetService) => - # We set the running container to not restart so that in case of a poweroff - # it doesn't come back after boot. - @prepareForHandover(currentService) - .then => - @start(targetService) - .then => - @waitToKill(currentService, targetService.config.labels['io.balena.update.handover-timeout']) - .then => - @kill(currentService) - - listenToEvents: => - if @listening - return - @listening = true - @docker.getEvents(filters: type: [ 'container' ]) - .then (stream) => - stream.on 'error', (err) -> - console.error('Error on docker events stream:', err) - parser = JSONStream.parse() - parser.on 'data', (data) => - if data?.status in ['die', 'start'] - @getByDockerContainerId(data.id) - .catchReturn(NotFoundError, null) - .then (service) => - if service? - if data.status == 'die' - @emit('change') - @logger.logSystemEvent(logTypes.serviceExit, { service }) - @containerHasDied[data.id] = true - else if data.status == 'start' and @containerHasDied[data.id] - @emit('change') - delete @containerHasDied[data.id] - @logger.logSystemEvent(logTypes.serviceRestart, { service }) - @logger.attach(@docker, data.id, service) - .catch (err) -> - console.error('Error on docker event:', err, err.stack) - return - new Promise (resolve, reject) -> - parser - .on 'error', (err) -> - console.error('Error on docker events stream', err) - reject(err) - .on 'end', -> - resolve() - stream.pipe(parser) - .catch (err) -> - console.error('Error listening to events:', err, err.stack) - .finally => - @listening = false - setTimeout => - @listenToEvents() - , 1000 - return - - attachToRunning: => - @getAll() - .map (service) => - if service.status == 'Running' - @logger.logSystemEvent(logTypes.startServiceNoop, { service }) - @logger.attach(@docker, service.containerId, service) diff --git a/src/compose/service-manager.d.ts b/src/compose/service-manager.d.ts deleted file mode 100644 index 28cc0ba4..00000000 --- a/src/compose/service-manager.d.ts +++ /dev/null @@ -1,12 +0,0 @@ -import * as Bluebird from 'bluebird'; -import { EventEmitter } from 'events'; - -import { Service } from '../compose/service'; - -// FIXME: Unfinished definition for this class... -declare class ServiceManager extends EventEmitter { - public getStatus(): Service[]; - public getAll(): Bluebird; -} - -export = ServiceManager; diff --git a/src/compose/service-manager.ts b/src/compose/service-manager.ts new file mode 100644 index 00000000..a8211f69 --- /dev/null +++ b/src/compose/service-manager.ts @@ -0,0 +1,660 @@ +import * as Bluebird from 'bluebird'; +import * as Dockerode from 'dockerode'; +import { EventEmitter } from 'events'; +import * as JSONStream from 'JSONStream'; +import * as _ from 'lodash'; +import { fs } from 'mz'; +import StrictEventEmitter from 'strict-event-emitter-types'; + +import Config from '../config'; +import Docker from '../lib/docker-utils'; +import Logger from '../logger'; + +import { PermissiveNumber } from '../config/types'; +import constants = require('../lib/constants'); +import { + InternalInconsistencyError, + NotFoundError, + StatusCodeError, +} from '../lib/errors'; +import * as LogTypes from '../lib/log-types'; +import { checkInt, isValidDeviceName } from '../lib/validation'; +import { Service } from './service'; + +interface ServiceConstructOpts { + docker: Docker; + logger: Logger; + config: Config; +} + +interface ServiceManagerEvents { + change: void; +} +type ServiceManagerEventEmitter = StrictEventEmitter< + EventEmitter, + ServiceManagerEvents +>; + +interface KillOpts { + removeContainer?: boolean; + wait?: boolean; +} + +export class ServiceManager extends (EventEmitter as { + new (): ServiceManagerEventEmitter; +}) { + private docker: Docker; + private logger: Logger; + private config: Config; + + // Whether a container has died, indexed by ID + private containerHasDied: Dictionary = {}; + private listening = false; + // Volatile state of containers, indexed by containerId (or random strings if + // we don't yet have an id) + private volatileState: Dictionary> = {}; + + public constructor(opts: ServiceConstructOpts) { + super(); + this.docker = opts.docker; + this.logger = opts.logger; + this.config = opts.config; + } + + public async getAll( + extraLabelFilters: string | string[] = [], + ): Promise { + const filterLabels = ['supervised'].concat(extraLabelFilters); + const containers = await this.listWithBothLabels(filterLabels); + + const services = await Bluebird.map(containers, async container => { + try { + const serviceInspect = await this.docker + .getContainer(container.Id) + .inspect(); + const service = Service.fromDockerContainer(serviceInspect); + // We know that the containerId is set below, because `fromDockerContainer` + // always sets it + const vState = this.volatileState[service.containerId!]; + if (vState != null && vState.status != null) { + service.status = vState.status; + } + return service; + } catch (e) { + if (NotFoundError(e)) { + return null; + } + throw e; + } + }); + + return services.filter(s => s != null) as Service[]; + } + + public async get(service: Service) { + const services = (await this.getAll( + `service-id=${service.serviceId}`, + )).filter(currentService => currentService.isEqualConfig(service)); + + if (services.length === 0) { + const e: StatusCodeError = new Error( + 'Could not find a container matching this service definition', + ); + e.statusCode = 404; + throw e; + } + return services[0]; + } + + public async getStatus() { + const services = await this.getAll(); + const status = _.clone(this.volatileState); + + for (const service of services) { + if (service.containerId == null) { + throw new InternalInconsistencyError( + `containerId not defined in ServiceManager.getStatus: ${service}`, + ); + } + if (status[service.containerId] == null) { + status[service.containerId] = _.pick(service, [ + 'appId', + 'imageId', + 'status', + 'releaseId', + 'commit', + 'createdAt', + 'serviceName', + ]) as Partial; + } + } + + return _.values(status); + } + + public async getByDockerContainerId( + containerId: string, + ): Promise { + const container = await this.docker.getContainer(containerId).inspect(); + if ( + container.Config.Labels['io.balena.supervised'] == null && + container.Config.Labels['io.resin.supervised'] == null + ) { + return null; + } + return Service.fromDockerContainer(container); + } + + public async updateMetadata( + service: Service, + metadata: { imageId: number; releaseId: number }, + ) { + const svc = await this.get(service); + if (svc.containerId == null) { + throw new InternalInconsistencyError( + `No containerId provided for service ${ + service.serviceName + } in ServiceManager.updateMetadata. Service: ${service}`, + ); + } + + await this.docker.getContainer(svc.containerId).rename({ + name: `${service.serviceName}_${metadata.imageId}_${metadata.releaseId}`, + }); + } + + public async handover(current: Service, target: Service) { + // We set the running container to not restart so that in case of a poweroff + // it doesn't come back after boot. + await this.prepareForHandover(current); + await this.start(target); + await this.waitToKill( + current, + target.config.labels['io.balena.update.handover-timeout'], + ); + await this.kill(current); + } + + public async killAllLegacy(): Promise { + // Containers haven't been normalized (this is an updated supervisor) + // so we need to stop and remove them + const supervisorImageId = (await this.docker + .getImage(constants.supervisorImage) + .inspect()).Id; + + for (const container of await this.docker.listContainers({ all: true })) { + if (container.ImageID !== supervisorImageId) { + await this.killContainer(container.Id, { + serviceName: 'legacy', + }); + } + } + } + + public kill(service: Service, opts: KillOpts = {}) { + if (service.containerId == null) { + throw new InternalInconsistencyError( + `Attempt to kill container without containerId! Service :${service}`, + ); + } + return this.killContainer(service.containerId, service, opts); + } + + public async remove(service: Service) { + this.logger.logSystemEvent(LogTypes.removeDeadService, { service }); + const existingService = await this.get(service); + + if (existingService.containerId == null) { + throw new InternalInconsistencyError( + `No containerId provided for service ${ + service.serviceName + } in ServiceManager.updateMetadata. Service: ${service}`, + ); + } + + try { + await this.docker + .getContainer(existingService.containerId) + .remove({ v: true }); + } catch (e) { + if (!NotFoundError(e)) { + this.logger.logSystemEvent(LogTypes.removeDeadServiceError, { + service, + error: e, + }); + throw e; + } + } + } + + public async create(service: Service) { + const mockContainerId = this.config.newUniqueKey(); + try { + const existing = await this.get(service); + if (existing.containerId == null) { + throw new InternalInconsistencyError( + `No containerId provided for service ${ + service.serviceName + } in ServiceManager.updateMetadata. Service: ${service}`, + ); + } + return this.docker.getContainer(existing.containerId); + } catch (e) { + if (!NotFoundError(e)) { + this.logger.logSystemEvent(LogTypes.installServiceError, { + service, + error: e, + }); + throw e; + } + + const deviceName = await this.config.get('name'); + if (!isValidDeviceName(deviceName)) { + throw new Error( + 'The device name contains a newline, which is unsupported by balena. ' + + 'Please fix the device name', + ); + } + + const conf = service.toDockerContainer({ deviceName }); + const nets = service.extraNetworksToJoin(); + + this.logger.logSystemEvent(LogTypes.installService, { service }); + this.reportNewStatus(mockContainerId, service, 'Installing'); + + const container = await this.docker.createContainer(conf); + service.containerId = container.id; + + await Promise.all( + _.map(nets, (endpointConfig, name) => + this.docker.getNetwork(name).connect({ + Container: container.id, + EndpointConfig: endpointConfig, + }), + ), + ); + + this.logger.logSystemEvent(LogTypes.installServiceSuccess, { service }); + return container; + } finally { + this.reportChange(mockContainerId); + } + } + + public async start(service: Service) { + let alreadyStarted = false; + let containerId: string | null = null; + + try { + const container = await this.create(service); + containerId = container.id; + this.logger.logSystemEvent(LogTypes.startService, { service }); + + this.reportNewStatus(containerId, service, 'Starting'); + + let remove = false; + let err: Error | undefined; + try { + await container.start(); + } catch (e) { + // Get the statusCode from the original cause and make sure it's + // definitely an int for comparison reasons + const maybeStatusCode = PermissiveNumber.decode(e.statusCode); + if (maybeStatusCode.isLeft()) { + remove = true; + err = new Error( + `Could not parse status code from docker error: ${e}`, + ); + throw err; + } + const statusCode = maybeStatusCode.value; + const message = e.message; + + // 304 means the container was already started, precisely what we want + if (statusCode === 304) { + alreadyStarted = true; + } else if ( + statusCode === 500 && + _.isString(message) && + message.trim().match(/exec format error$/) + ) { + // Provide a friendlier error message for "exec format error" + const deviceType = await this.config.get('deviceType'); + err = new Error( + `Application architecture incompatible with ${deviceType}: exec format error`, + ); + throw err; + } else { + // rethrow the same error + err = e; + throw e; + } + } finally { + if (remove) { + // If starting the container fialed, we remove it so that it doesn't litter + await container.remove({ v: true }).catch(_.noop); + this.logger.logSystemEvent(LogTypes.startServiceError, { + service, + error: err, + }); + } + } + + const serviceId = service.serviceId; + const imageId = service.imageId; + if (serviceId == null || imageId == null) { + throw new InternalInconsistencyError( + `serviceId and imageId not defined for service: ${ + service.serviceName + } in ServiceManager.start`, + ); + } + + this.logger.attach(this.docker, container.id, { serviceId, imageId }); + + if (alreadyStarted) { + this.logger.logSystemEvent(LogTypes.startServiceNoop, { service }); + } else { + this.logger.logSystemEvent(LogTypes.startServiceSuccess, { service }); + } + + service.config.running = true; + return container; + } finally { + if (containerId != null) { + this.reportChange(containerId); + } + } + } + + public listenToEvents() { + if (this.listening) { + return; + } + + this.listening = true; + + const listen = async () => { + const stream = await this.docker.getEvents({ + filters: { type: ['container'] }, + }); + + stream.on('error', e => { + console.error(`Error on docker events stream:`, e); + }); + const parser = JSONStream.parse(); + parser.on('data', async (data: { status: string; id: string }) => { + if (data != null) { + const status = data.status; + if (status === 'die' || status === 'start') { + try { + let service: Service | null = null; + try { + service = await this.getByDockerContainerId(data.id); + } catch (e) { + if (!NotFoundError(e)) { + throw e; + } + } + if (service != null) { + this.emit('change'); + if (status === 'die') { + this.logger.logSystemEvent(LogTypes.serviceExit, { service }); + this.containerHasDied[data.id] = true; + } else if ( + status === 'start' && + this.containerHasDied[data.id] + ) { + delete this.containerHasDied[data.id]; + this.logger.logSystemEvent(LogTypes.serviceRestart, { + service, + }); + + const serviceId = service.serviceId; + const imageId = service.imageId; + if (serviceId == null || imageId == null) { + throw new InternalInconsistencyError( + `serviceId and imageId not defined for service: ${ + service.serviceName + } in ServiceManager.listenToEvents`, + ); + } + this.logger.attach(this.docker, data.id, { + serviceId, + imageId, + }); + } + } + } catch (e) { + console.error('Error on docker event:', e, e.stack); + } + } + } + }); + + return new Promise((resolve, reject) => { + parser + .on('error', (e: Error) => { + console.error('Error on docker events stream:', e); + reject(e); + }) + .on('end', resolve); + stream.pipe(parser); + }); + }; + + Bluebird.resolve(listen()) + .catch(e => { + console.error('Error listening to events:', e, e.stack); + }) + .finally(() => { + this.listening = false; + setTimeout(() => this.listenToEvents(), 1000); + }); + + return; + } + + public async attachToRunning() { + const services = await this.getAll(); + for (const service of services) { + if (service.status === 'Running') { + this.logger.logSystemEvent(LogTypes.startServiceNoop, { service }); + + const serviceId = service.serviceId; + const imageId = service.imageId; + if (serviceId == null || imageId == null) { + throw new InternalInconsistencyError( + `serviceId and imageId not defined for service: ${ + service.serviceName + } in ServiceManager.start`, + ); + } + + if (service.containerId == null) { + throw new InternalInconsistencyError( + `containerId not defined for service: ${ + service.serviceName + } in ServiceManager.attachToRunning`, + ); + } + this.logger.attach(this.docker, service.containerId, { + serviceId, + imageId, + }); + } + } + } + + private reportChange(containerId?: string, status?: Partial) { + if (containerId != null) { + if (status != null) { + this.volatileState[containerId] = {}; + _.merge(this.volatileState[containerId], status); + } else if (this.volatileState[containerId] != null) { + delete this.volatileState[containerId]; + } + } + this.emit('change'); + } + + private reportNewStatus( + containerId: string, + service: Partial, + status: string, + ) { + this.reportChange( + containerId, + _.merge( + { status }, + _.pick(service, ['imageId', 'appId', 'releaseId', 'commit']), + ), + ); + } + + private killContainer( + containerId: string, + service: Partial = {}, + { removeContainer = true, wait = false }: KillOpts = {}, + ): Bluebird { + // To maintain compatibility of the `wait` flag, this function is not + // async, but it feels like whether or not the promise should be waited on + // should performed by the caller + // TODO: Remove the need for the wait flag + + return Bluebird.try(() => { + this.logger.logSystemEvent(LogTypes.stopService, { service }); + if (service.imageId != null) { + this.reportNewStatus(containerId, service, 'Stopping'); + } + + const containerObj = this.docker.getContainer(containerId); + const killPromise = Bluebird.resolve(containerObj.stop()) + .then(() => { + if (removeContainer) { + return containerObj.remove({ v: true }); + } + }) + .catch(e => { + // Get the statusCode from the original cause and make sure it's + // definitely an int for comparison reasons + const maybeStatusCode = PermissiveNumber.decode(e.statusCode); + if (maybeStatusCode.isLeft()) { + throw new Error( + `Could not parse status code from docker error: ${e}`, + ); + } + const statusCode = maybeStatusCode.value; + + // 304 means the container was already stopped, so we can just remove it + if (statusCode === 304) { + this.logger.logSystemEvent(LogTypes.stopServiceNoop, { service }); + // Why do we attempt to remove the container again? + if (removeContainer) { + return containerObj.remove({ v: true }); + } + } else if (statusCode === 404) { + // 404 means the container doesn't exist, precisely what we want! + this.logger.logSystemEvent(LogTypes.stopRemoveServiceNoop, { + service, + }); + } else { + throw e; + } + }) + .tap(() => { + delete this.containerHasDied[containerId]; + this.logger.logSystemEvent(LogTypes.stopServiceSuccess, { service }); + }) + .catch(e => { + this.logger.logSystemEvent(LogTypes.stopServiceError, { + service, + error: e, + }); + }) + .finally(() => { + if (service.imageId != null) { + this.reportChange(containerId); + } + }); + + if (wait) { + return killPromise; + } + return; + }); + } + + private async listWithBothLabels( + labelList: string[], + ): Promise { + const listWithPrefix = (prefix: string) => + this.docker.listContainers({ + all: true, + filters: { + label: _.map(labelList, v => `${prefix}${v}`), + }, + }); + + const [legacy, current] = await Promise.all([ + listWithPrefix('io.resin.'), + listWithPrefix('io.balena.'), + ]); + + return _.unionBy(legacy, current, 'Id'); + } + + private async prepareForHandover(service: Service) { + const svc = await this.get(service); + if (svc.containerId == null) { + throw new InternalInconsistencyError( + `No containerId provided for service ${ + service.serviceName + } in ServiceManager.prepareForHandover. Service: ${service}`, + ); + } + const container = this.docker.getContainer(svc.containerId); + await container.update({ RestartPolicy: {} }); + return await container.rename({ + name: `old_${service.serviceName}_${service.imageId}_${service.imageId}_${ + service.releaseId + }`, + }); + } + + private waitToKill(service: Service, timeout: number | string) { + const pollInterval = 100; + timeout = checkInt(timeout, { positive: true }) || 60000; + const deadline = Date.now() + timeout; + + const handoverCompletePaths = service.handoverCompleteFullPathsOnHost(); + + const wait = (): Bluebird => + Bluebird.any( + handoverCompletePaths.map(file => + fs.stat(file).then(() => fs.unlink(file).catch(_.noop)), + ), + ).catch(async () => { + if (Date.now() < deadline) { + await Bluebird.delay(pollInterval); + return wait(); + } else { + console.log( + `Handover timeout has passed, assuming handover was completed for service ${ + service.serviceName + }`, + ); + } + }); + + console.log( + `Waiting for handover to be completed for service: ${ + service.serviceName + }`, + ); + + return wait().then(() => { + console.log(`Handover complete for service ${service.serviceName}`); + }); + } +} diff --git a/typings/JSONStream.d.ts b/typings/JSONStream.d.ts new file mode 100644 index 00000000..fe8ab5cf --- /dev/null +++ b/typings/JSONStream.d.ts @@ -0,0 +1 @@ +declare module 'JSONStream';