diff --git a/src/application-manager.coffee b/src/application-manager.coffee index 963b9651..dacfb01d 100644 --- a/src/application-manager.coffee +++ b/src/application-manager.coffee @@ -75,6 +75,19 @@ module.exports = class ApplicationManager extends EventEmitter @_targetVolatilePerImageId = {} @_containerStarted = {} + # Rather than relying on removing out of data database entries when we're no + # longer using them, set a task that runs periodically to clear out the database + # This has the advantage that if for some reason a container is removed while the + # supervisor is down, we won't have zombie entries in the db + setInterval => + @docker.listContainers(all: true).then (containers) => + @logger.clearOutOfDateDBLogs(_.map(containers, 'Id')) + # Once a day + , 1000 * 60 * 60 * 24 + # But also run it in the background on startup + @docker.listContainers(all: true).then (containers) => + @logger.clearOutOfDateDBLogs(_.map(containers, 'Id')) + @config.on 'change', (changedConfig) => if changedConfig.appUpdatePollInterval @images.appUpdatePollInterval = changedConfig.appUpdatePollInterval diff --git a/src/compose/service-manager.ts b/src/compose/service-manager.ts index 24f76732..6cc78ad2 100644 --- a/src/compose/service-manager.ts +++ b/src/compose/service-manager.ts @@ -364,9 +364,7 @@ export class ServiceManager extends (EventEmitter as { this.logger.attach(this.docker, container.id, { serviceId, imageId }); - if (alreadyStarted) { - this.logger.logSystemEvent(LogTypes.startServiceNoop, { service }); - } else { + if (!alreadyStarted) { this.logger.logSystemEvent(LogTypes.startServiceSuccess, { service }); } @@ -471,8 +469,6 @@ export class ServiceManager extends (EventEmitter as { const services = await this.getAll(); for (const service of services) { if (service.status === 'Running') { - this.logger.logSystemEvent(LogTypes.startServiceNoop, { service }); - const serviceId = service.serviceId; const imageId = service.imageId; if (serviceId == null || imageId == null) { diff --git a/src/lib/log-types.ts b/src/lib/log-types.ts index ae208702..328d0a0d 100644 --- a/src/lib/log-types.ts +++ b/src/lib/log-types.ts @@ -92,10 +92,6 @@ export const startServiceSuccess: LogType = { eventName: 'Service started', humanName: 'Started service', }; -export const startServiceNoop: LogType = { - eventName: 'Service already running', - humanName: 'Service is already running', -}; export const startServiceError: LogType = { eventName: 'Service start error', humanName: 'Failed to start service', diff --git a/src/logger.ts b/src/logger.ts index 8ae89882..398b04f4 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,17 +1,18 @@ import * as Bluebird from 'bluebird'; -import * as es from 'event-stream'; import * as _ from 'lodash'; +import DB from './db'; import { EventTracker } from './event-tracker'; import Docker from './lib/docker-utils'; import { LogType } from './lib/log-types'; import { writeLock } from './lib/update-lock'; import { BalenaLogBackend, + ContainerLogs, LocalLogBackend, LogBackend, LogMessage, -} from './logging-backends'; +} from './logging'; interface LoggerSetupOptions { apiEndpoint: string; @@ -24,12 +25,8 @@ interface LoggerSetupOptions { type LogEventObject = Dictionary | null; -enum OutputStream { - Stdout, - Stderr, -} - interface LoggerConstructOptions { + db: DB; eventTracker: EventTracker; } @@ -39,16 +36,13 @@ export class Logger { private localBackend: LocalLogBackend | null = null; private eventTracker: EventTracker; - private attached: { - [key in OutputStream]: { [containerId: string]: boolean } - } = { - [OutputStream.Stderr]: {}, - [OutputStream.Stdout]: {}, - }; + private db: DB; + private containerLogs: { [containerId: string]: ContainerLogs } = {}; - public constructor({ eventTracker }: LoggerConstructOptions) { + public constructor({ db, eventTracker }: LoggerConstructOptions) { this.backend = null; this.eventTracker = eventTracker; + this.db = db; } public init({ @@ -139,20 +133,47 @@ export class Logger { containerId: string, serviceInfo: { serviceId: number; imageId: number }, ): Bluebird { - return Bluebird.using(this.lock(containerId), () => { - return this.attachStream( - docker, - OutputStream.Stdout, - containerId, - serviceInfo, - ).then(() => { - return this.attachStream( - docker, - OutputStream.Stderr, - containerId, - serviceInfo, - ); + // First detect if we already have an attached log stream + // for this container + if (containerId in this.containerLogs) { + return Bluebird.resolve(); + } + + return Bluebird.using(this.lock(containerId), async () => { + const logs = new ContainerLogs(containerId, docker); + this.containerLogs[containerId] = logs; + logs.on('error', err => { + console.error(`Container log retrieval error: ${err}`); + delete this.containerLogs[containerId]; }); + logs.on('log', async logMessage => { + this.log(_.merge({}, serviceInfo, logMessage)); + + // Take the timestamp and set it in the database as the last + // log sent for this + await this.db + .models('containerLogs') + .where({ containerId }) + .update({ lastSentTimestamp: logMessage.timestamp }); + }); + + logs.on('closed', () => delete this.containerLogs[containerId]); + + // Get the timestamp of the last sent log for this container + let [timestampObj] = await this.db + .models('containerLogs') + .select('lastSentTimestamp') + .where({ containerId }); + + if (timestampObj == null) { + timestampObj = { lastSentTimestamp: 0 }; + // Create the row so we have something to update + await this.db + .models('containerLogs') + .insert({ containerId, lastSentTimestamp: 0 }); + } + const { lastSentTimestamp } = timestampObj; + return logs.attach(lastSentTimestamp); }); } @@ -200,67 +221,12 @@ export class Logger { this.logSystemMessage(message, obj, eventName); } - // TODO: This function interacts with the docker daemon directly, - // using the container id, but it would be better if this was provided - // by the Compose/Service-Manager module, as an accessor - private attachStream( - docker: Docker, - streamType: OutputStream, - containerId: string, - { serviceId, imageId }: { serviceId: number; imageId: number }, - ): Bluebird { - return Bluebird.try(() => { - if (this.attached[streamType][containerId]) { - return; - } - - const logsOpts = { - follow: true, - stdout: streamType === OutputStream.Stdout, - stderr: streamType === OutputStream.Stderr, - timestamps: true, - since: Math.floor(Date.now() / 1000), - }; - - return docker - .getContainer(containerId) - .logs(logsOpts) - .then(stream => { - this.attached[streamType][containerId] = true; - - stream - .on('error', err => { - console.error('Error on container logs', err); - this.attached[streamType][containerId] = false; - }) - .pipe(es.split()) - .on('data', (logBuf: Buffer | string) => { - if (_.isString(logBuf)) { - logBuf = Buffer.from(logBuf); - } - const logMsg = Logger.extractContainerMessage(logBuf); - if (logMsg != null) { - const message: LogMessage = { - message: logMsg.message, - timestamp: logMsg.timestamp, - serviceId, - imageId, - }; - if (streamType === OutputStream.Stderr) { - message.isStdErr = true; - } - this.log(message); - } - }) - .on('error', err => { - console.error('Error on container logs', err); - this.attached[streamType][containerId] = false; - }) - .on('end', () => { - this.attached[streamType][containerId] = false; - }); - }); - }); + public async clearOutOfDateDBLogs(containerIds: string[]) { + console.log('Performing database cleanup for container log timestamps'); + await this.db + .models('containerLogs') + .whereNotIn('containerId', containerIds) + .delete(); } private objectNameForLogs(eventObj: LogEventObject): string | null { @@ -294,30 +260,6 @@ export class Logger { return null; } - - private static extractContainerMessage( - msgBuf: Buffer, - ): { message: string; timestamp: number } | null { - // Non-tty message format from: - // https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach - if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) { - // Take the header from this message, and parse it as normal - msgBuf = msgBuf.slice(8); - } - const logLine = msgBuf.toString(); - const space = logLine.indexOf(' '); - if (space > 0) { - let timestamp = new Date(logLine.substr(0, space)).getTime(); - if (_.isNaN(timestamp)) { - timestamp = Date.now(); - } - return { - timestamp, - message: logLine.substr(space + 1), - }; - } - return null; - } } export default Logger; diff --git a/src/logging-backends/balena-backend.ts b/src/logging/balena-backend.ts similarity index 100% rename from src/logging-backends/balena-backend.ts rename to src/logging/balena-backend.ts diff --git a/src/logging/container.ts b/src/logging/container.ts new file mode 100644 index 00000000..bb20a3af --- /dev/null +++ b/src/logging/container.ts @@ -0,0 +1,101 @@ +import * as es from 'event-stream'; +import { EventEmitter } from 'events'; +import * as _ from 'lodash'; +import * as Stream from 'stream'; +import StrictEventEmitter from 'strict-event-emitter-types'; + +import Docker from '../lib/docker-utils'; + +export interface ContainerLog { + message: string; + timestamp: number; + isStdout: boolean; +} + +interface LogsEvents { + log: ContainerLog; + closed: void; + error: Error; +} + +type LogsEventEmitter = StrictEventEmitter; + +export class ContainerLogs extends (EventEmitter as { + new (): LogsEventEmitter; +}) { + public constructor(public containerId: string, private docker: Docker) { + super(); + } + + public async attach(lastSentTimestamp: number) { + const logOpts = { + follow: true, + timestamps: true, + since: Math.floor(lastSentTimestamp / 1000), + }; + const stdoutLogOpts = { stdout: true, stderr: false, ...logOpts }; + const stderrLogOpts = { stderr: true, stdout: false, ...logOpts }; + + const container = this.docker.getContainer(this.containerId); + const stdoutStream = await container.logs(stdoutLogOpts); + const stderrStream = await container.logs(stderrLogOpts); + + [[stdoutStream, true], [stderrStream, false]].forEach( + ([stream, isStdout]: [Stream.Readable, boolean]) => { + stream + .on('error', err => { + this.emit( + 'error', + new Error(`Error on container logs: ${err} ${err.stack}`), + ); + }) + .pipe(es.split()) + .on('data', (logBuf: Buffer | string) => { + if (_.isString(logBuf)) { + logBuf = Buffer.from(logBuf); + } + const logMsg = ContainerLogs.extractMessage(logBuf); + if (logMsg != null) { + this.emit('log', { isStdout, ...logMsg }); + } + }) + .on('error', err => { + this.emit( + 'error', + new Error(`Error on container logs: ${err} ${err.stack}`), + ); + }) + .on('end', () => this.emit('closed')); + }, + ); + } + + private static extractMessage( + msgBuf: Buffer, + ): { message: string; timestamp: number } | undefined { + // Non-tty message format from: + // https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach + if ( + _.includes([0, 1, 2], msgBuf[0]) && + _.every(msgBuf.slice(1, 7), c => c === 0) + ) { + // Take the header from this message, and parse it as normal + msgBuf = msgBuf.slice(8); + } + const logLine = msgBuf.toString(); + const space = logLine.indexOf(' '); + if (space > 0) { + let timestamp = new Date(logLine.substr(0, space)).getTime(); + if (_.isNaN(timestamp)) { + timestamp = Date.now(); + } + return { + timestamp, + message: logLine.substr(space + 1), + }; + } + return; + } +} + +export default ContainerLogs; diff --git a/src/logging-backends/index.ts b/src/logging/index.ts similarity index 54% rename from src/logging-backends/index.ts rename to src/logging/index.ts index e39dd9e6..e341845a 100644 --- a/src/logging-backends/index.ts +++ b/src/logging/index.ts @@ -1,5 +1,12 @@ +import { BalenaLogBackend } from './balena-backend'; +import ContainerLogs from './container'; import { LocalLogBackend } from './local-backend'; import { LogBackend, LogMessage } from './log-backend'; -import { BalenaLogBackend } from './balena-backend'; -export { LocalLogBackend, LogBackend, LogMessage, BalenaLogBackend }; +export { + ContainerLogs, + LocalLogBackend, + LogBackend, + LogMessage, + BalenaLogBackend, +}; diff --git a/src/logging-backends/local-backend.ts b/src/logging/local-backend.ts similarity index 100% rename from src/logging-backends/local-backend.ts rename to src/logging/local-backend.ts diff --git a/src/logging-backends/log-backend.ts b/src/logging/log-backend.ts similarity index 100% rename from src/logging-backends/log-backend.ts rename to src/logging/log-backend.ts diff --git a/src/migrations/M00004.js b/src/migrations/M00004.js new file mode 100644 index 00000000..a9c4ba5e --- /dev/null +++ b/src/migrations/M00004.js @@ -0,0 +1,10 @@ +exports.up = function(knex) { + return knex.schema.createTable('containerLogs', table => { + table.string('containerId'); + table.integer('lastSentTimestamp'); + }); +}; + +exports.down = function(knex, Promise) { + return Promise.reject(new Error('Not Implemented')); +}; diff --git a/src/supervisor.coffee b/src/supervisor.coffee index 4a7aa229..a0ddd49f 100644 --- a/src/supervisor.coffee +++ b/src/supervisor.coffee @@ -30,7 +30,7 @@ module.exports = class Supervisor extends EventEmitter @db = new DB() @config = new Config({ @db }) @eventTracker = new EventTracker() - @logger = new Logger({ @eventTracker }) + @logger = new Logger({ @db, @eventTracker }) @deviceState = new DeviceState({ @config, @db, @eventTracker, @logger }) @apiBinder = new APIBinder({ @config, @db, @deviceState, @eventTracker }) @@ -60,12 +60,16 @@ module.exports = class Supervisor extends EventEmitter .then => @config.getMany(startupConfigFields) .then (conf) => + # We can't print to the dashboard until the logger has started up, + # so we leave a trail of breadcrumbs in the logs in case runtime + # fails to get to the first dashboard logs + console.log('Starting event tracker') @eventTracker.init(conf) .then => - @eventTracker.track('Supervisor start') - .then => + console.log('Starting up api binder') @apiBinder.initClient() .then => + console.log('Starting logging infrastructure') @logger.init({ apiEndpoint: conf.apiEndpoint, uuid: conf.uuid, @@ -74,6 +78,8 @@ module.exports = class Supervisor extends EventEmitter enableLogs: conf.loggingEnabled, localMode: conf.localMode }) + .then => + @logger.logSystemMessage('Supervisor starting', {}, 'Supervisor start') .then => if conf.legacyAppsPresent console.log('Legacy app detected, running migration') diff --git a/test/12-logger.spec.coffee b/test/12-logger.spec.coffee index 8daea577..a6167591 100644 --- a/test/12-logger.spec.coffee +++ b/test/12-logger.spec.coffee @@ -8,6 +8,7 @@ m = require 'mochainon' { stub } = m.sinon { Logger } = require '../src/logger' +{ ContainerLogs } = require '../src/logging/container' describe 'Logger', -> beforeEach -> @_req = new stream.PassThrough() @@ -111,7 +112,7 @@ describe 'Logger', -> message = '\u0001\u0000\u0000\u0000\u0000\u0000\u0000?2018-09-21T12:37:09.819134000Z this is the message' buffer = Buffer.from(message) - expect(Logger.extractContainerMessage(buffer)).to.deep.equal({ + expect(ContainerLogs.extractMessage(buffer)).to.deep.equal({ message: 'this is the message', timestamp: 1537533429819 })