diff --git a/src/logger.ts b/src/logger.ts index 8ae89882..b093bc1b 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,5 +1,4 @@ import * as Bluebird from 'bluebird'; -import * as es from 'event-stream'; import * as _ from 'lodash'; import { EventTracker } from './event-tracker'; @@ -8,10 +7,11 @@ import { LogType } from './lib/log-types'; import { writeLock } from './lib/update-lock'; import { BalenaLogBackend, + ContainerLogs, LocalLogBackend, LogBackend, LogMessage, -} from './logging-backends'; +} from './logging'; interface LoggerSetupOptions { apiEndpoint: string; @@ -24,11 +24,6 @@ interface LoggerSetupOptions { type LogEventObject = Dictionary | null; -enum OutputStream { - Stdout, - Stderr, -} - interface LoggerConstructOptions { eventTracker: EventTracker; } @@ -39,12 +34,7 @@ export class Logger { private localBackend: LocalLogBackend | null = null; private eventTracker: EventTracker; - private attached: { - [key in OutputStream]: { [containerId: string]: boolean } - } = { - [OutputStream.Stderr]: {}, - [OutputStream.Stdout]: {}, - }; + private containerLogs: { [containerId: string]: ContainerLogs } = {}; public constructor({ eventTracker }: LoggerConstructOptions) { this.backend = null; @@ -139,20 +129,24 @@ export class Logger { containerId: string, serviceInfo: { serviceId: number; imageId: number }, ): Bluebird { + // First detect if we already have an attached log stream + // for this container + if (containerId in this.containerLogs) { + return Bluebird.resolve(); + } + return Bluebird.using(this.lock(containerId), () => { - return this.attachStream( - docker, - OutputStream.Stdout, - containerId, - serviceInfo, - ).then(() => { - return this.attachStream( - docker, - OutputStream.Stderr, - containerId, - serviceInfo, - ); + const logs = new ContainerLogs(containerId, docker); + this.containerLogs[containerId] = logs; + logs.on('error', err => { + console.error(`Container log retrieval error: ${err}`); + delete this.containerLogs[containerId]; }); + logs.on('log', logMessage => { + this.log(_.merge({}, serviceInfo, logMessage)); + }); + logs.on('closed', () => delete this.containerLogs[containerId]); + return logs.attach(); }); } @@ -200,69 +194,6 @@ export class Logger { this.logSystemMessage(message, obj, eventName); } - // TODO: This function interacts with the docker daemon directly, - // using the container id, but it would be better if this was provided - // by the Compose/Service-Manager module, as an accessor - private attachStream( - docker: Docker, - streamType: OutputStream, - containerId: string, - { serviceId, imageId }: { serviceId: number; imageId: number }, - ): Bluebird { - return Bluebird.try(() => { - if (this.attached[streamType][containerId]) { - return; - } - - const logsOpts = { - follow: true, - stdout: streamType === OutputStream.Stdout, - stderr: streamType === OutputStream.Stderr, - timestamps: true, - since: Math.floor(Date.now() / 1000), - }; - - return docker - .getContainer(containerId) - .logs(logsOpts) - .then(stream => { - this.attached[streamType][containerId] = true; - - stream - .on('error', err => { - console.error('Error on container logs', err); - this.attached[streamType][containerId] = false; - }) - .pipe(es.split()) - .on('data', (logBuf: Buffer | string) => { - if (_.isString(logBuf)) { - logBuf = Buffer.from(logBuf); - } - const logMsg = Logger.extractContainerMessage(logBuf); - if (logMsg != null) { - const message: LogMessage = { - message: logMsg.message, - timestamp: logMsg.timestamp, - serviceId, - imageId, - }; - if (streamType === OutputStream.Stderr) { - message.isStdErr = true; - } - this.log(message); - } - }) - .on('error', err => { - console.error('Error on container logs', err); - this.attached[streamType][containerId] = false; - }) - .on('end', () => { - this.attached[streamType][containerId] = false; - }); - }); - }); - } - private objectNameForLogs(eventObj: LogEventObject): string | null { if (eventObj == null) { return null; @@ -294,30 +225,6 @@ export class Logger { return null; } - - private static extractContainerMessage( - msgBuf: Buffer, - ): { message: string; timestamp: number } | null { - // Non-tty message format from: - // https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach - if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) { - // Take the header from this message, and parse it as normal - msgBuf = msgBuf.slice(8); - } - const logLine = msgBuf.toString(); - const space = logLine.indexOf(' '); - if (space > 0) { - let timestamp = new Date(logLine.substr(0, space)).getTime(); - if (_.isNaN(timestamp)) { - timestamp = Date.now(); - } - return { - timestamp, - message: logLine.substr(space + 1), - }; - } - return null; - } } export default Logger; diff --git a/src/logging-backends/balena-backend.ts b/src/logging/balena-backend.ts similarity index 100% rename from src/logging-backends/balena-backend.ts rename to src/logging/balena-backend.ts diff --git a/src/logging/container.ts b/src/logging/container.ts new file mode 100644 index 00000000..b8489143 --- /dev/null +++ b/src/logging/container.ts @@ -0,0 +1,98 @@ +import * as es from 'event-stream'; +import { EventEmitter } from 'events'; +import * as _ from 'lodash'; +import * as Stream from 'stream'; +import StrictEventEmitter from 'strict-event-emitter-types'; + +import Docker from '../lib/docker-utils'; + +export interface ContainerLog { + message: string; + timestamp: number; + isStdout: boolean; +} + +interface LogsEvents { + log: ContainerLog; + closed: void; + error: Error; +} + +type LogsEventEmitter = StrictEventEmitter; + +export class ContainerLogs extends (EventEmitter as { + new (): LogsEventEmitter; +}) { + public constructor(public containerId: string, private docker: Docker) { + super(); + } + + public async attach() { + const logOpts = { + follow: true, + timestamps: true, + since: Math.floor(Date.now() / 1000), + }; + const stdoutLogOpts = { stdout: true, stderr: false, ...logOpts }; + const stderrLogOpts = { stderr: true, stdout: false, ...logOpts }; + + const container = this.docker.getContainer(this.containerId); + const stdoutStream = await container.logs(stdoutLogOpts); + const stderrStream = await container.logs(stderrLogOpts); + + [[stdoutStream, true], [stderrStream, false]].forEach( + ([stream, isStdout]: [Stream.Readable, boolean]) => { + stream + .on('error', err => { + this.emit( + 'error', + new Error(`Error on container logs: ${err} ${err.stack}`), + ); + }) + .pipe(es.split()) + .on('data', (logBuf: Buffer | string) => { + if (_.isString(logBuf)) { + logBuf = Buffer.from(logBuf); + } + const logMsg = ContainerLogs.extractMessage(logBuf); + if (logMsg != null) { + this.emit('log', { isStdout, ...logMsg }); + } + }) + .on('error', err => { + this.emit( + 'error', + new Error(`Error on container logs: ${err} ${err.stack}`), + ); + }) + .on('end', () => this.emit('closed')); + }, + ); + } + + private static extractMessage( + msgBuf: Buffer, + ): { message: string; timestamp: number } | null { + // Non-tty message format from: + // https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach + if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) { + // Take the header from this message, and parse it as normal + msgBuf = msgBuf.slice(8); + } + const logLine = msgBuf.toString(); + const space = logLine.indexOf(' '); + if (space > 0) { + let timestamp = new Date(logLine.substr(0, space)).getTime(); + if (_.isNaN(timestamp)) { + timestamp = Date.now(); + } + return { + timestamp, + message: logLine.substr(space + 1), + }; + } + return null; + } +} + +export default ContainerLogs; diff --git a/src/logging-backends/index.ts b/src/logging/index.ts similarity index 54% rename from src/logging-backends/index.ts rename to src/logging/index.ts index e39dd9e6..e341845a 100644 --- a/src/logging-backends/index.ts +++ b/src/logging/index.ts @@ -1,5 +1,12 @@ +import { BalenaLogBackend } from './balena-backend'; +import ContainerLogs from './container'; import { LocalLogBackend } from './local-backend'; import { LogBackend, LogMessage } from './log-backend'; -import { BalenaLogBackend } from './balena-backend'; -export { LocalLogBackend, LogBackend, LogMessage, BalenaLogBackend }; +export { + ContainerLogs, + LocalLogBackend, + LogBackend, + LogMessage, + BalenaLogBackend, +}; diff --git a/src/logging-backends/local-backend.ts b/src/logging/local-backend.ts similarity index 100% rename from src/logging-backends/local-backend.ts rename to src/logging/local-backend.ts diff --git a/src/logging-backends/log-backend.ts b/src/logging/log-backend.ts similarity index 100% rename from src/logging-backends/log-backend.ts rename to src/logging/log-backend.ts