Merge pull request #750 from resin-io/689-mc-local-mode

Add support for multicontainer local mode
This commit is contained in:
CameronDiver 2018-10-15 10:49:52 +01:00 committed by GitHub
commit 6a89581b2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 696 additions and 288 deletions

View File

@ -37,7 +37,6 @@
"@types/node": "^10.3.1",
"@types/rwlock": "^5.0.2",
"@types/shell-quote": "^1.6.0",
"JSONStream": "^1.1.2",
"blinking": "~0.0.2",
"bluebird": "^3.5.0",
"body-parser": "^1.12.0",

View File

@ -420,8 +420,9 @@ module.exports = class APIBinder
@cachedResinApi._request(requestParams)
_report: =>
@config.getMany([ 'deviceId', 'apiTimeout', 'apiEndpoint', 'uuid' ])
@config.getMany([ 'deviceId', 'apiTimeout', 'apiEndpoint', 'uuid', 'localMode' ])
.then (conf) =>
return if checkTruthy(conf.localMode)
stateDiff = @_getStateDiff()
if _.size(stateDiff) is 0
return

View File

@ -9,6 +9,7 @@ path = require 'path'
constants = require './lib/constants'
Docker = require './lib/docker-utils'
{ LocalModeManager } = require './local-mode'
updateLock = require './lib/update-lock'
{ checkTruthy, checkInt, checkString } = require './lib/validation'
{ NotFoundError } = require './lib/errors'
@ -72,6 +73,7 @@ module.exports = class ApplicationManager extends EventEmitter
@networks = new NetworkManager({ @docker, @logger })
@volumes = new Volumes({ @docker, @logger })
@proxyvisor = new Proxyvisor({ @config, @logger, @db, @docker, @images, applications: this })
@localModeManager = new LocalModeManager(@config, @docker, @logger, @db)
@timeSpentFetching = 0
@fetchesInProgress = 0
@_targetVolatilePerImageId = {}
@ -141,7 +143,9 @@ module.exports = class ApplicationManager extends EventEmitter
saveImage: (step) =>
@images.save(step.image)
cleanup: (step) =>
@images.cleanup()
@config.get('localMode').then (localMode) ->
if !checkTruthy(localMode)
@images.cleanup()
createNetworkOrVolume: (step) =>
if step.model is 'network'
# TODO: These step targets should be the actual compose objects,
@ -179,6 +183,8 @@ module.exports = class ApplicationManager extends EventEmitter
init: =>
@images.cleanupDatabase()
.then =>
@localModeManager.init()
.then =>
@services.attachToRunning()
.then =>
@ -186,9 +192,13 @@ module.exports = class ApplicationManager extends EventEmitter
# Returns the status of applications and their services
getStatus: =>
@config.get('localMode').then (localMode) =>
@_getStatus(localMode)
_getStatus: (localMode) =>
Promise.join(
@services.getStatus()
@images.getStatus()
@images.getStatus(localMode)
@config.get('currentCommit')
@db.models('app').select([ 'appId', 'releaseId', 'commit' ])
(services, images, currentCommit, targetApps) ->
@ -545,7 +555,7 @@ module.exports = class ApplicationManager extends EventEmitter
return null
}
_nextStepForService: ({ current, target }, updateContext) =>
_nextStepForService: ({ current, target }, updateContext, localMode) =>
{ targetApp, networkPairs, volumePairs, installPairs, updatePairs, availableImages, downloading } = updateContext
if current?.status == 'Stopping'
# There is already a kill step in progress for this service, so we wait
@ -555,8 +565,11 @@ module.exports = class ApplicationManager extends EventEmitter
# Dead containers have to be removed
return serviceAction('remove', current.serviceId, current)
needsDownload = !_.some availableImages, (image) =>
image.dockerImageId == target?.config.image or @images.isSameImage(image, { name: target.imageName })
needsDownload = false
# Don't attempt to fetch any images in local mode, they should already be there
if !localMode
needsDownload = !_.some availableImages, (image) =>
image.dockerImageId == target?.config.image or @images.isSameImage(image, { name: target.imageName })
# This service needs an image download but it's currently downloading, so we wait
if needsDownload and target?.imageId in downloading
@ -585,7 +598,7 @@ module.exports = class ApplicationManager extends EventEmitter
timeout = checkInt(target.config.labels['io.resin.update.handover-timeout'])
return @_strategySteps[strategy](current, target, needsDownload, dependenciesMetForStart, dependenciesMetForKill, needsSpecialKill, timeout)
_nextStepsForAppUpdate: (currentApp, targetApp, availableImages = [], downloading = []) =>
_nextStepsForAppUpdate: (currentApp, targetApp, localMode, availableImages = [], downloading = []) =>
emptyApp = { services: [], volumes: {}, networks: {} }
if !targetApp?
targetApp = emptyApp
@ -618,7 +631,7 @@ module.exports = class ApplicationManager extends EventEmitter
# next step for install pairs in download - start order, but start requires dependencies, networks and volumes met
# next step for update pairs in order by update strategy. start requires dependencies, networks and volumes met.
for pair in installPairs.concat(updatePairs)
step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, availableImages, downloading })
step = @_nextStepForService(pair, { targetApp, networkPairs, volumePairs, installPairs, updatePairs, availableImages, downloading }, localMode)
if step?
steps.push(step)
# next step for network pairs - remove requires services killed, create kill if no pairs or steps affect that service
@ -730,8 +743,6 @@ module.exports = class ApplicationManager extends EventEmitter
.tap (appsForDB) =>
Promise.map appsForDB, (app) =>
@db.upsertModel('app', app, { appId: app.appId }, trx)
.then (appsForDB) ->
trx('app').whereNotIn('appId', _.map(appsForDB, 'appId')).del()
.then =>
@proxyvisor.setTargetInTransaction(dependent, trx)
@ -752,7 +763,10 @@ module.exports = class ApplicationManager extends EventEmitter
@_targetVolatilePerImageId[imageId] = {}
getTargetApps: =>
@config.get('apiEndpoint'). then (source = '') =>
@config.getMany(['apiEndpoint', 'localMode']). then ({ apiEndpoint, localMode }) =>
source = apiEndpoint
if checkTruthy(localMode)
source = 'local'
Promise.map(@db.models('app').where({ source }), @normaliseAndExtendAppFromDB)
.map (app) =>
if !_.isEmpty(app.services)
@ -788,8 +802,7 @@ module.exports = class ApplicationManager extends EventEmitter
# imagesToSave: images that
# - are locally available (i.e. an image with the same digest exists)
# - are not saved to the DB with all their metadata (serviceId, serviceName, etc)
_compareImages: (current, target, available) =>
_compareImages: (current, target, available, localMode) =>
allImagesForTargetApp = (app) -> _.map(app.services, imageForService)
allImagesForCurrentApp = (app) ->
_.map app.services, (service) ->
@ -807,9 +820,11 @@ module.exports = class ApplicationManager extends EventEmitter
!_.some available, (availableImage) => @images.isSameImage(availableImage, targetImage)
# Images that are available but we don't have them in the DB with the exact metadata:
imagesToSave = _.filter targetImages, (targetImage) =>
_.some(available, (availableImage) => @images.isSameImage(availableImage, targetImage)) and
!_.some(availableWithoutIds, (img) -> _.isEqual(img, targetImage))
imagesToSave = []
if !localMode
imagesToSave = _.filter targetImages, (targetImage) =>
_.some(available, (availableImage) => @images.isSameImage(availableImage, targetImage)) and
!_.some availableWithoutIds, (img) -> _.isEqual(img, targetImage)
deltaSources = _.map imagesToDownload, (image) =>
return @bestDeltaSource(image, available)
@ -822,13 +837,10 @@ module.exports = class ApplicationManager extends EventEmitter
return { imagesToSave, imagesToRemove }
_inferNextSteps: (cleanupNeeded, availableImages, downloading, supervisorNetworkReady, current, target, ignoreImages, { localMode, delta }) =>
localMode = checkTruthy(localMode)
delta = checkTruthy(delta)
Promise.try =>
delta = checkTruthy(delta)
if checkTruthy(localMode)
target = _.cloneDeep(target)
target.local.apps = _.mapValues target.local.apps, (app) ->
app.services = []
return app
if localMode
ignoreImages = true
currentByAppId = current.local.apps ? {}
targetByAppId = target.local.apps ? {}
@ -839,7 +851,7 @@ module.exports = class ApplicationManager extends EventEmitter
if !ignoreImages and _.isEmpty(downloading)
if cleanupNeeded
nextSteps.push({ action: 'cleanup' })
{ imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages)
{ imagesToRemove, imagesToSave } = @_compareImages(current, target, availableImages, localMode)
for image in imagesToSave
nextSteps.push({ action: 'saveImage', image })
if _.isEmpty(imagesToSave)
@ -849,7 +861,7 @@ module.exports = class ApplicationManager extends EventEmitter
if _.isEmpty(nextSteps)
allAppIds = _.union(_.keys(currentByAppId), _.keys(targetByAppId))
for appId in allAppIds
nextSteps = nextSteps.concat(@_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], availableImages, downloading))
nextSteps = nextSteps.concat(@_nextStepsForAppUpdate(currentByAppId[appId], targetByAppId[appId], localMode, availableImages, downloading))
newDownloads = _.filter(nextSteps, (s) -> s.action == 'fetch').length
if !ignoreImages and delta and newDownloads > 0
downloadsToBlock = downloading.length + newDownloads - constants.maxDeltaDownloads
@ -885,18 +897,32 @@ module.exports = class ApplicationManager extends EventEmitter
@actionExecutors[step.action](step, { force, skipLock })
getRequiredSteps: (currentState, targetState, ignoreImages = false) =>
Promise.join(
@images.isCleanupNeeded()
@images.getAvailable()
@images.getDownloadingImageIds()
@networks.supervisorNetworkReady()
@config.getMany([ 'localMode', 'delta' ])
(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, conf) =>
@_inferNextSteps(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, currentState, targetState, ignoreImages, conf)
.then (nextSteps) =>
if ignoreImages and _.some(nextSteps, action: 'fetch')
throw new Error('Cannot fetch images while executing an API action')
@proxyvisor.getRequiredSteps(availableImages, downloading, currentState, targetState, nextSteps)
.then (proxyvisorSteps) ->
return nextSteps.concat(proxyvisorSteps)
)
@config.get('localMode').then (localMode) =>
Promise.join(
@images.isCleanupNeeded()
@images.getAvailable(localMode)
@images.getDownloadingImageIds()
@networks.supervisorNetworkReady()
@config.get('delta')
(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, delta) =>
conf = { delta, localMode }
if localMode
cleanupNeeded = false
@_inferNextSteps(cleanupNeeded, availableImages, downloading, supervisorNetworkReady, currentState, targetState, ignoreImages, conf)
.then (nextSteps) =>
if ignoreImages and _.some(nextSteps, action: 'fetch')
throw new Error('Cannot fetch images while executing an API action')
@proxyvisor.getRequiredSteps(availableImages, downloading, currentState, targetState, nextSteps)
.then (proxyvisorSteps) ->
return nextSteps.concat(proxyvisorSteps)
)
serviceNameFromId: (serviceId) =>
@getTargetApps().then (apps) ->
# Multi-app warning!
# We assume here that there will only be a single
# application
for appId, app of apps
return _.find app.services, (svc) ->
svc.serviceId == serviceId
.get('serviceName')

View File

@ -2,6 +2,8 @@ import { EventEmitter } from 'events';
import { ServiceAction } from './device-api/common';
import { DeviceApplicationState } from './types/state';
import { Logger } from './logger';
import { EventTracker } from './event-tracker';
import Images = require('./compose/images');
import ServiceManager = require('./compose/service-manager');
@ -31,9 +33,9 @@ export class ApplicationManager extends EventEmitter {
// TODO: When the module which is/declares these fields is converted to
// typecript, type the following
public _lockingIfNecessary: any;
public logger: any;
public logger: Logger;
public deviceState: any;
public eventTracker: any;
public eventTracker: EventTracker;
public services: ServiceManager;
public db: DB;
@ -48,6 +50,8 @@ export class ApplicationManager extends EventEmitter {
public getStatus(): Promise<DeviceApplicationState>;
public serviceNameFromId(serviceId: number): Promise<string>;
}
export default ApplicationManager;

View File

@ -193,9 +193,16 @@ module.exports = class Images extends EventEmitter
@_matchesTagOrDigest(image, dockerImage) or image.dockerImageId == dockerImage.Id
# Gets all images that are supervised, in an object containing name, appId, serviceId, serviceName, imageId, dependent.
getAvailable: =>
getAvailable: (localMode) =>
@_withImagesFromDockerAndDB (dockerImages, supervisedImages) =>
_.filter(supervisedImages, (image) => @_isAvailableInDocker(image, dockerImages))
.then (images) =>
if localMode
# Get all images present on the local daemon which are tagged as local images
return @_getLocalModeImages().then (localImages) ->
images.concat(localImages)
return images
getDownloadingImageIds: =>
Promise.try =>
@ -218,8 +225,8 @@ module.exports = class Images extends EventEmitter
ids = _.map(imagesToRemove, 'id')
@db.models('image').del().whereIn('id', ids)
getStatus: =>
@getAvailable()
getStatus: (localMode) =>
@getAvailable(localMode)
.map (image) ->
image.status = 'Downloaded'
image.downloadProgress = null
@ -304,3 +311,6 @@ module.exports = class Images extends EventEmitter
return image1.name == image2.name or Images.hasSameDigest(image1.name, image2.name)
isSameImage: @isSameImage
_getLocalModeImages: =>
@docker.listImages(filters: label: [ 'io.resin.local.image=1' ])

View File

@ -77,17 +77,21 @@ export class Service {
appConfig = ComposeUtils.camelCaseConfig(appConfig);
const intOrNull = (val: string | number | null | undefined): number | null => {
return checkInt(val) || null;
};
// Seperate the application information from the docker
// container configuration
service.imageId = appConfig.imageId;
service.imageId = intOrNull(appConfig.imageId);
delete appConfig.imageId;
service.serviceName = appConfig.serviceName;
delete appConfig.serviceName;
service.appId = appConfig.appId;
service.appId = intOrNull(appConfig.appId);
delete appConfig.appId;
service.releaseId = appConfig.releaseId;
service.releaseId = intOrNull(appConfig.releaseId);
delete appConfig.releaseId;
service.serviceId = appConfig.serviceId;
service.serviceId = intOrNull(appConfig.serviceId);
delete appConfig.serviceId;
service.imageName = appConfig.imageName;
delete appConfig.imageName;

0
src/device-api/index.ts Normal file
View File

View File

@ -1,15 +1,31 @@
import * as Bluebird from 'bluebird';
import { Request, Response, Router } from 'express';
import * as _ from 'lodash';
import { fs } from 'mz';
import { ApplicationManager } from '../application-manager';
import { Service } from '../compose/service';
import { appNotFoundMessage, serviceNotFoundMessage } from '../lib/messages';
import { checkTruthy } from '../lib/validation';
import { doPurge, doRestart, serviceAction } from './common';
import supervisorVersion = require('../lib/supervisor-version');
export function createV2Api(router: Router, applications: ApplicationManager) {
const { _lockingIfNecessary } = applications;
const { _lockingIfNecessary, deviceState } = applications;
const messageFromError = (err?: Error | string | null): string => {
let message = 'Unknown error';
if (err != null) {
if (_.isError(err) && err.message != null) {
message = err.message;
} else {
message = err as string;
}
}
return message;
};
const handleServiceAction = (
req: Request,
@ -50,17 +66,7 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
});
})
.catch((err) => {
let message;
if (err != null) {
if (err.message != null) {
message = err.message;
} else {
message = err;
}
} else {
message = 'Unknown error';
}
res.status(503).send(message);
res.status(503).send(messageFromError(err));
});
});
};
@ -108,16 +114,7 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
res.status(200).send('OK');
})
.catch((err) => {
let message;
if (err != null) {
message = err.message;
if (message == null) {
message = err;
}
} else {
message = 'Unknown error';
}
res.status(503).send(message);
res.status(503).send(messageFromError(err));
});
});
@ -199,4 +196,111 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
res.status(200).json(apps);
});
});
router.get('/v2/local/target-state', async (_req, res) => {
try {
const localMode = checkTruthy(await deviceState.config.get('localMode'));
if (!localMode) {
return res.status(400).json({
status: 'failed',
message: 'Target state can only be retrieved when in local mode',
});
}
res.status(200).json({
status: 'success',
state: await deviceState.getTarget(),
});
} catch (err) {
res.status(503).send({
status: 'failed',
message: messageFromError(err),
});
}
});
router.post('/v2/local/target-state', async (req, res) => {
// let's first ensure that we're in local mode, otherwise
// this function should not do anything
// TODO: We really should refactor the config module to provide bools
// as bools etc
try {
const localMode = checkTruthy(await deviceState.config.get('localMode'));
if (!localMode) {
return res.status(400).json({
status: 'failed',
message: 'Target state can only set when device is in local mode',
});
}
// Now attempt to set the state
const force = req.body.force;
const targetState = req.body;
try {
await deviceState.setTarget(targetState, true);
await deviceState.triggerApplyTarget({ force });
res.status(200).json({
status: 'success',
message: 'OK',
});
} catch (e) {
res.status(400).json({
status: 'failed',
message: e.message,
});
}
} catch (e) {
const message = 'Could not apply target state: ';
res.status(503).json({
status: 'failed',
message: message + messageFromError(e),
});
}
});
router.get('/v2/local/device-info', async (_req, res) => {
// Return the device type and slug so that local mode builds can use this to
// resolve builds
try {
// FIXME: We should be mounting the following file into the supervisor from the
// start-resin-supervisor script, changed in meta-resin - but until then, hardcode it
const data = await fs.readFile('/mnt/root/resin-boot/device-type.json', 'utf8');
const deviceInfo = JSON.parse(data);
return res.status(200).json({
status: 'sucess',
info: {
arch: deviceInfo.arch,
deviceType: deviceInfo.slug,
},
});
} catch (e) {
const message = 'Could not fetch device information: ';
res.status(503).json({
status: 'failed',
message: message + messageFromError(e),
});
}
});
router.get('/v2/local/logs', async (_req, res) => {
const backend = applications.logger.getLocalBackend();
backend.assignServiceNameResolver(applications.serviceNameFromId.bind(applications));
// Get the stream, and stream it into res
const listenStream = backend.attachListener();
listenStream.pipe(res);
});
router.get('/v2/version', (_req, res) => {
res.status(200).json({
status: 'success',
version: supervisorVersion,
});
});
}

View File

@ -40,6 +40,8 @@ validateState = Promise.method (state) ->
if state.dependent?
validateDependentState(state.dependent)
# TODO (refactor): This shouldn't be here, and instead should be part of the other
# device api stuff in ./device-api
createDeviceStateRouter = (deviceState) ->
router = express.Router()
router.use(bodyParser.urlencoded(extended: true))
@ -238,11 +240,12 @@ module.exports = class DeviceState extends EventEmitter
usingInferStepsLock: (fn) =>
Promise.using @_inferStepsLock, -> fn()
setTarget: (target) ->
setTarget: (target, localSource = false) ->
Promise.join(
@config.get('apiEndpoint'),
validateState(target),
(source) =>
(apiEndpoint) =>
source = apiEndpoint
@usingWriteLockTarget =>
# Apps, deviceConfig, dependent
@db.transaction (trx) =>
@ -251,7 +254,10 @@ module.exports = class DeviceState extends EventEmitter
.then =>
@deviceConfig.setTarget(target.local.config, trx)
.then =>
@applications.setTarget(target.local.apps, target.dependent, source, trx)
if localSource
@applications.setTarget(target.local.apps, target.dependent, 'local', trx)
else
@applications.setTarget(target.local.apps, target.dependent, apiEndpoint, trx)
)
getTarget: ({ initial = false, intermediate = false } = {}) =>
@ -484,6 +490,7 @@ module.exports = class DeviceState extends EventEmitter
triggerApplyTarget: ({ force = false, delay = 0, initial = false } = {}) =>
if @applyInProgress
console.log('==> Apply in progress')
if !@scheduledApply?
@scheduledApply = { force, delay }
else
@ -499,6 +506,7 @@ module.exports = class DeviceState extends EventEmitter
console.log('Applying target state')
@applyTarget({ force, initial })
.finally =>
console.log('==> Done applying target state!')
@applyInProgress = false
@reportCurrentState()
if @scheduledApply?

View File

@ -4,15 +4,19 @@ import * as childProcess from 'child_process';
// The following is exported so that we stub it in the tests
export const execAsync = Promise.promisify(childProcess.exec);
function clearIptablesRule(rule: string): Promise<void> {
return execAsync(`iptables -D ${rule}`).return();
}
function clearAndAppendIptablesRule(rule: string): Promise<void> {
return execAsync(`iptables -D ${rule}`)
return clearIptablesRule(rule)
.catchReturn(null)
.then(() => execAsync(`iptables -A ${rule}`))
.return();
}
function clearAndInsertIptablesRule(rule: string): Promise<void> {
return execAsync(`iptables -D ${rule}`)
return clearIptablesRule(rule)
.catchReturn(null)
.then(() => execAsync(`iptables -I ${rule}`))
.return();
@ -30,3 +34,11 @@ export function rejectOnAllInterfacesExcept(
.catch(() => clearAndAppendIptablesRule(`INPUT -p tcp --dport ${port} -j DROP`))
.return();
}
export function removeRejections(port: number): Promise<void> {
return clearIptablesRule(`INPUT -p tcp --dport ${port} -j REJECT`)
.catchReturn(null)
.then(() => clearIptablesRule(`INPUT -p tcp --dport ${port} -j DROP`))
.catchReturn(null)
.return();
}

86
src/local-mode.ts Normal file
View File

@ -0,0 +1,86 @@
import * as Bluebird from 'bluebird';
import * as Docker from 'dockerode';
import * as _ from 'lodash';
import Config = require('./config');
import Database = require('./db');
import { checkTruthy } from './lib/validation';
import { Logger } from './logger';
/**
* This class handles any special cases necessary for switching
* modes in localMode.
*
* Currently this is needed because of inconsistencies in the way
* that the state machine handles local mode and normal operation.
* We should aim to remove these inconsistencies in the future.
*/
export class LocalModeManager {
public constructor(
public config: Config,
public docker: Docker,
public logger: Logger,
public db: Database,
) { }
public async init() {
// Setup a listener to catch state changes relating to local mode
this.config.on('change', (changed) => {
if (changed.localMode != null) {
const localMode = checkTruthy(changed.localMode) || false;
// First switch the logger to it's correct state
this.logger.switchBackend(localMode);
// If we're leaving local mode, make sure to remove all of the
// leftover artifacts
if (!localMode) {
this.removeLocalModeArtifacts();
}
}
});
const localMode = checkTruthy(await this.config.get('localMode') || false);
if (!localMode) {
// Remove any leftovers if necessary
await this.removeLocalModeArtifacts();
}
}
public async removeLocalModeArtifacts(): Promise<void> {
try {
const images = await this.getLocalModeImages();
const containers = await this.getLocalModeContainers(images);
await Bluebird.map(containers, (containerId) => {
console.log('Removing local mode container: ', containerId);
return this.docker.getContainer(containerId).remove({ force: true });
});
await Bluebird.map(images, (imageId) => {
console.log('Removing local mode image: ', imageId);
return this.docker.getImage(imageId).remove({ force: true });
});
// Remove any local mode state added to the database
await this.db.models('app').del().where({ source: 'local' });
} catch (e) {
console.log('There was an error clearing local mode artifacts: ', e);
}
}
private async getLocalModeImages(): Promise<string[]> {
// Return all local mode images present on the local docker daemon
return _.map(await this.docker.listImages({ filters: { label: [ 'io.resin.local.image=1' ] } }), 'Id');
}
private async getLocalModeContainers(localModeImageIds: string[]): Promise<string[]> {
return _(await this.docker.listContainers())
.filter(({ Image }) => _.includes(localModeImageIds, Image))
.map('Id')
.value();
}
}
export default LocalModeManager;

View File

@ -1,202 +1,17 @@
import * as Bluebird from 'bluebird';
import * as es from 'event-stream';
import { ClientRequest } from 'http';
import * as https from 'https';
import * as _ from 'lodash';
import * as Lock from 'rwlock';
import * as stream from 'stream';
import * as url from 'url';
import * as zlib from 'zlib';
import { EventTracker } from './event-tracker';
import Docker = require('./lib/docker-utils');
import { LogType } from './lib/log-types';
const ZLIB_TIMEOUT = 100;
const COOLDOWN_PERIOD = 5 * 1000;
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;
const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;
interface Options extends url.UrlWithParsedQuery {
method: string;
headers: Dictionary<string>;
}
type LogMessage = Dictionary<any>;
abstract class LogBackend {
public offlineMode: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
}
class ResinLogBackend extends LogBackend {
private req: ClientRequest | null = null;
private dropCount: number = 0;
private writable: boolean = true;
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
timeout: NodeJS.Timer;
public constructor(
apiEndpoint: string,
uuid: string,
deviceApiKey: string,
) {
super();
this.opts = url.parse(`${apiEndpoint}/device/v2/${uuid}/log-stream`) as any;
this.opts.method = 'POST';
this.opts.headers = {
Authorization: `Bearer ${deviceApiKey}`,
'Content-Type': 'application/x-ndjson',
'Content-Encoding': 'gzip',
};
// This stream serves serves as a message buffer during reconnections
// while we unpipe the old, malfunctioning connection and then repipe a
// new one.
this.stream = new stream.PassThrough({
allowHalfOpen: true,
// We halve the high watermark because a passthrough stream has two
// buffers, one for the writable and one for the readable side. The
// write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2,
});
this.stream.on('drain', () => {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
isStdErr: true,
});
this.dropCount = 0;
}
});
}
public log(message: LogMessage) {
if (this.offlineMode || !this.publishEnabled) {
return;
}
if (!_.isObject(message)) {
return;
}
message = _.assign({
timestamp: Date.now(),
message: '',
}, message);
if (!message.isSystem && message.serviceId == null) {
return;
}
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
}
private setup = _.throttle(() => {
this.req = https.request(this.opts);
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
console.log('LogBackend: server responded with status code:', res.statusCode);
this.teardown();
});
this.req.on('timeout', () => this.teardown());
this.req.on('close', () => this.teardown());
this.req.on('error', (err) => {
console.log('LogBackend: unexpected error:', err);
this.teardown();
});
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', () => this.teardown());
this.gzip.pipe(this.req);
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentialy lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.stream.pipe(this.gzip);
this.flush();
}
}, RESPONSE_GRACE_PERIOD);
}, COOLDOWN_PERIOD);
private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
// Flushing every ZLIB_TIMEOUT hits a balance between compression and
// latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
// whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
private flush = _.throttle(() => {
if (this.gzip != null) {
this.gzip.flush(zlib.Z_SYNC_FLUSH);
}
}, ZLIB_TIMEOUT, { leading: false });
private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
}
this.req = null;
}
}
private write(message: LogMessage) {
this.snooze();
if (this.req == null) {
this.setup();
}
if (this.writable) {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} else {
this.dropCount += 1;
}
}
}
import {
LocalLogBackend,
LogBackend,
LogMessage,
ResinLogBackend,
} from './logging-backends';
interface LoggerSetupOptions {
apiEndpoint: string;
@ -204,6 +19,7 @@ interface LoggerSetupOptions {
deviceApiKey: string;
offlineMode: boolean;
enableLogs: boolean;
localMode: boolean;
}
type LogEventObject = Dictionary<any> | null;
@ -221,7 +37,11 @@ export class Logger {
private writeLock: (key: string) => Bluebird<() => void> = Bluebird.promisify(
new Lock().async.writeLock,
);
private backend: LogBackend | null = null;
private resinBackend: ResinLogBackend | null = null;
private localBackend: LocalLogBackend | null = null;
private eventTracker: EventTracker;
private attached: {
[key in OutputStream]: { [containerId: string]: boolean }
@ -241,13 +61,40 @@ export class Logger {
deviceApiKey,
offlineMode,
enableLogs,
localMode,
}: LoggerSetupOptions,
) {
this.backend = new ResinLogBackend(apiEndpoint, uuid, deviceApiKey);
this.resinBackend = new ResinLogBackend(apiEndpoint, uuid, deviceApiKey);
this.localBackend = new LocalLogBackend();
this.backend = localMode ? this.localBackend : this.resinBackend;
this.backend.offlineMode = offlineMode;
this.backend.publishEnabled = enableLogs;
}
public switchBackend(localMode: boolean) {
if (localMode) {
// Use the local mode backend
this.backend = this.localBackend;
console.log('Switching logging backend to LocalLogBackend');
} else {
// Use the resin backend
this.backend = this.resinBackend;
console.log('Switching logging backend to ResinLogBackend');
}
}
public getLocalBackend(): LocalLogBackend {
// TODO: Think about this interface a little better, it would be
// nicer to proxy the logs via the logger module
if (this.localBackend == null) {
// TODO: Type this as an internal inconsistency error
throw new Error('Local backend logger is not defined.');
}
return this.localBackend;
}
public enable(value: boolean = true) {
if (this.backend != null) {
this.backend.publishEnabled = value;
@ -349,27 +196,27 @@ export class Logger {
docker: Docker,
streamType: OutputStream,
containerId: string,
{ serviceId, imageId }: { serviceId: string, imageId: string},
{ serviceId, imageId }: { serviceId: string, imageId: string },
): Bluebird<void> {
return Bluebird.try(() => {
if (this.attached[streamType][containerId]) {
return;
}
return Bluebird.try(() => {
if (this.attached[streamType][containerId]) {
return;
}
const logsOpts = {
follow: true,
stdout: streamType === OutputStream.Stdout,
stderr: streamType === OutputStream.Stderr,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
const logsOpts = {
follow: true,
stdout: streamType === OutputStream.Stdout,
stderr: streamType === OutputStream.Stderr,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
return docker.getContainer(containerId).logs(logsOpts)
.then((stream) => {
this.attached[streamType][containerId] = true;
return docker.getContainer(containerId).logs(logsOpts)
.then((stream) => {
this.attached[streamType][containerId] = true;
stream
stream
.on('error', (err) => {
console.error('Error on container logs', err);
this.attached[streamType][containerId] = false;
@ -400,9 +247,9 @@ export class Logger {
.on('end', () => {
this.attached[streamType][containerId] = false;
});
});
});
});
});
}
@ -451,8 +298,12 @@ export class Logger {
const logLine = msgBuf.toString();
const space = logLine.indexOf(' ');
if (space > 0) {
let timestamp = (new Date(logLine.substr(0, space))).getTime();
if (_.isNaN(timestamp)) {
timestamp = Date.now();
}
return {
timestamp: (new Date(logLine.substr(0, space))).getTime(),
timestamp,
message: logLine.substr(space + 1),
};
}

View File

@ -0,0 +1,10 @@
import { LocalLogBackend } from './local-backend';
import { LogBackend, LogMessage } from './log-backend';
import { ResinLogBackend } from './resin-backend';
export {
LocalLogBackend,
LogBackend,
LogMessage,
ResinLogBackend,
};

View File

@ -0,0 +1,71 @@
import * as Bluebird from 'bluebird';
import * as _ from 'lodash';
import { Readable } from 'stream';
import { checkInt } from '../lib/validation';
import { LogBackend, LogMessage } from './log-backend';
export class LocalLogBackend extends LogBackend {
private globalListeners: Readable[] = [];
private serviceNameResolver: (serviceId: number) => Bluebird<string>;
public log(message: LogMessage): void {
if (this.publishEnabled) {
Bluebird.try(() => {
if (!message.isSystem) {
if (this.serviceNameResolver == null) {
// This means there is no listener assigned, drop the logs
// TODO: Store these, and resolve them when a listener is attached
return null;
}
const svcId = checkInt(message.serviceId);
if (svcId == null) {
console.log('Warning: Non-integer service id found in local logs: ');
console.log(` ${JSON.stringify(message)}`);
return null;
}
// TODO: Can we cache this value? The service ids are reused, so
// we would need a way of invalidating the cache
return this.serviceNameResolver(svcId).then((serviceName) => {
return _.assign({}, { serviceName }, message);
});
} else {
return message;
}
})
.then((message: LogMessage | null) => {
if (message != null) {
_.each(this.globalListeners, (listener) => {
listener.push(`${JSON.stringify(message)}\n`);
});
}
})
.catch((e) => {
console.log('Error streaming local log output: ', e);
});
}
}
/**
* Get a stream which will be populated with log messages from
* local mode services and the supervisor
*/
public attachListener(): Readable {
const stream = new Readable({
// We don't actually need to do anything here
read: _.noop,
});
this.globalListeners.push(stream);
return stream;
}
public assignServiceNameResolver(resolver: (serviceId: number) => Bluebird<string>) {
this.serviceNameResolver = resolver;
}
}
export default LocalLogBackend;

View File

@ -0,0 +1,11 @@
export type LogMessage = Dictionary<any>;
export abstract class LogBackend {
public offlineMode: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
}
export default LogBackend;

View File

@ -0,0 +1,185 @@
import { ClientRequest } from 'http';
import * as https from 'https';
import * as _ from 'lodash';
import * as stream from 'stream';
import * as url from 'url';
import * as zlib from 'zlib';
import { LogBackend, LogMessage } from './log-backend';
const ZLIB_TIMEOUT = 100;
const COOLDOWN_PERIOD = 5 * 1000;
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;
const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;
interface Options extends url.UrlWithParsedQuery {
method: string;
headers: Dictionary<string>;
}
export class ResinLogBackend extends LogBackend {
private req: ClientRequest | null = null;
private dropCount: number = 0;
private writable: boolean = true;
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
timeout: NodeJS.Timer;
public constructor(
apiEndpoint: string,
uuid: string,
deviceApiKey: string,
) {
super();
this.opts = url.parse(`${apiEndpoint}/device/v2/${uuid}/log-stream`) as any;
this.opts.method = 'POST';
this.opts.headers = {
Authorization: `Bearer ${deviceApiKey}`,
'Content-Type': 'application/x-ndjson',
'Content-Encoding': 'gzip',
};
// This stream serves serves as a message buffer during reconnections
// while we unpipe the old, malfunctioning connection and then repipe a
// new one.
this.stream = new stream.PassThrough({
allowHalfOpen: true,
// We halve the high watermark because a passthrough stream has two
// buffers, one for the writable and one for the readable side. The
// write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2,
});
this.stream.on('drain', () => {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
isStdErr: true,
});
this.dropCount = 0;
}
});
}
public log(message: LogMessage) {
if (this.offlineMode || !this.publishEnabled) {
return;
}
if (!_.isObject(message)) {
return;
}
message = _.assign({
timestamp: Date.now(),
message: '',
}, message);
if (!message.isSystem && message.serviceId == null) {
return;
}
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
}
private setup = _.throttle(() => {
this.req = https.request(this.opts);
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
console.log('LogBackend: server responded with status code:', res.statusCode);
this.teardown();
});
this.req.on('timeout', () => this.teardown());
this.req.on('close', () => this.teardown());
this.req.on('error', (err) => {
console.log('LogBackend: unexpected error:', err);
this.teardown();
});
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', () => this.teardown());
this.gzip.pipe(this.req);
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentialy lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.stream.pipe(this.gzip);
this.flush();
}
}, RESPONSE_GRACE_PERIOD);
}, COOLDOWN_PERIOD);
private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
// Flushing every ZLIB_TIMEOUT hits a balance between compression and
// latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
// whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
private flush = _.throttle(() => {
if (this.gzip != null) {
this.gzip.flush(zlib.Z_SYNC_FLUSH);
}
}, ZLIB_TIMEOUT, { leading: false });
private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
}
this.req = null;
}
}
private write(message: LogMessage) {
this.snooze();
if (this.req == null) {
this.setup();
}
if (this.writable) {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} else {
this.dropCount += 1;
}
}
}

View File

@ -68,10 +68,33 @@ module.exports = class SupervisorAPI
@_api.use(router)
listen: (allowedInterfaces, port, apiTimeout) =>
iptables.rejectOnAllInterfacesExcept(allowedInterfaces, port)
@config.get('localMode').then (localMode) =>
@applyListeningRules(checkTruthy(localMode), port, allowedInterfaces)
.then =>
# Monitor the switching of local mode, and change which interfaces will
# be listented to based on that
@config.on 'change', (changedConfig) =>
if changedConfig.localMode?
@applyListeningRules(changedConfig.localMode, port, allowedInterfaces)
.then =>
@server = @_api.listen(port)
@server.timeout = apiTimeout
applyListeningRules: (allInterfaces, port, allowedInterfaces) =>
Promise.try ->
if checkTruthy(allInterfaces)
iptables.removeRejections(port).then ->
console.log('Supervisor API listening on all interfaces')
else
iptables.rejectOnAllInterfacesExcept(allowedInterfaces, port).then ->
console.log('Supervisor API listening on allowed interfaces only')
.catch (e) =>
# If there's an error, stop the supervisor api from answering any endpoints,
# and this will eventually be restarted by the healthcheck
console.log('Error on switching supervisor API listening rules - stopping API.')
console.log(' ', e)
if @server?
@stop()
stop: ->
@server.close()

View File

@ -22,6 +22,7 @@ startupConfigFields = [
'mixpanelToken'
'mixpanelHost'
'loggingEnabled'
'localMode'
]
module.exports = class Supervisor extends EventEmitter
@ -57,6 +58,7 @@ module.exports = class Supervisor extends EventEmitter
deviceApiKey: conf.deviceApiKey,
offlineMode: checkTruthy(conf.offlineMode),
enableLogs: checkTruthy(conf.loggingEnabled),
localMode: checkTruthy(conf.localMode)
})
.then =>
@deviceState.init()

View File

@ -33,6 +33,7 @@ describe 'Logger', ->
deviceApiKey: 'secretkey'
offlineMode: false
enableLogs: true
localMode: false
})
afterEach ->