Refactor container logging interface and rename logging-backends

Container logging is now handled by a class which attaches and emits
information from the container. We add these to the directory
logging-backends/, and rename it to logging/.

Change-type: minor
Signed-off-by: Cameron Diver <cameron@balena.io>
This commit is contained in:
Cameron Diver 2019-04-01 14:27:12 +01:00
parent 196f173e13
commit 25fd11bed3
No known key found for this signature in database
GPG Key ID: 49690ED87032539F
6 changed files with 126 additions and 114 deletions

View File

@ -1,5 +1,4 @@
import * as Bluebird from 'bluebird'; import * as Bluebird from 'bluebird';
import * as es from 'event-stream';
import * as _ from 'lodash'; import * as _ from 'lodash';
import { EventTracker } from './event-tracker'; import { EventTracker } from './event-tracker';
@ -8,10 +7,11 @@ import { LogType } from './lib/log-types';
import { writeLock } from './lib/update-lock'; import { writeLock } from './lib/update-lock';
import { import {
BalenaLogBackend, BalenaLogBackend,
ContainerLogs,
LocalLogBackend, LocalLogBackend,
LogBackend, LogBackend,
LogMessage, LogMessage,
} from './logging-backends'; } from './logging';
interface LoggerSetupOptions { interface LoggerSetupOptions {
apiEndpoint: string; apiEndpoint: string;
@ -24,11 +24,6 @@ interface LoggerSetupOptions {
type LogEventObject = Dictionary<any> | null; type LogEventObject = Dictionary<any> | null;
enum OutputStream {
Stdout,
Stderr,
}
interface LoggerConstructOptions { interface LoggerConstructOptions {
eventTracker: EventTracker; eventTracker: EventTracker;
} }
@ -39,12 +34,7 @@ export class Logger {
private localBackend: LocalLogBackend | null = null; private localBackend: LocalLogBackend | null = null;
private eventTracker: EventTracker; private eventTracker: EventTracker;
private attached: { private containerLogs: { [containerId: string]: ContainerLogs } = {};
[key in OutputStream]: { [containerId: string]: boolean }
} = {
[OutputStream.Stderr]: {},
[OutputStream.Stdout]: {},
};
public constructor({ eventTracker }: LoggerConstructOptions) { public constructor({ eventTracker }: LoggerConstructOptions) {
this.backend = null; this.backend = null;
@ -139,20 +129,24 @@ export class Logger {
containerId: string, containerId: string,
serviceInfo: { serviceId: number; imageId: number }, serviceInfo: { serviceId: number; imageId: number },
): Bluebird<void> { ): Bluebird<void> {
// First detect if we already have an attached log stream
// for this container
if (containerId in this.containerLogs) {
return Bluebird.resolve();
}
return Bluebird.using(this.lock(containerId), () => { return Bluebird.using(this.lock(containerId), () => {
return this.attachStream( const logs = new ContainerLogs(containerId, docker);
docker, this.containerLogs[containerId] = logs;
OutputStream.Stdout, logs.on('error', err => {
containerId, console.error(`Container log retrieval error: ${err}`);
serviceInfo, delete this.containerLogs[containerId];
).then(() => {
return this.attachStream(
docker,
OutputStream.Stderr,
containerId,
serviceInfo,
);
}); });
logs.on('log', logMessage => {
this.log(_.merge({}, serviceInfo, logMessage));
});
logs.on('closed', () => delete this.containerLogs[containerId]);
return logs.attach();
}); });
} }
@ -200,69 +194,6 @@ export class Logger {
this.logSystemMessage(message, obj, eventName); this.logSystemMessage(message, obj, eventName);
} }
// TODO: This function interacts with the docker daemon directly,
// using the container id, but it would be better if this was provided
// by the Compose/Service-Manager module, as an accessor
private attachStream(
docker: Docker,
streamType: OutputStream,
containerId: string,
{ serviceId, imageId }: { serviceId: number; imageId: number },
): Bluebird<void> {
return Bluebird.try(() => {
if (this.attached[streamType][containerId]) {
return;
}
const logsOpts = {
follow: true,
stdout: streamType === OutputStream.Stdout,
stderr: streamType === OutputStream.Stderr,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
return docker
.getContainer(containerId)
.logs(logsOpts)
.then(stream => {
this.attached[streamType][containerId] = true;
stream
.on('error', err => {
console.error('Error on container logs', err);
this.attached[streamType][containerId] = false;
})
.pipe(es.split())
.on('data', (logBuf: Buffer | string) => {
if (_.isString(logBuf)) {
logBuf = Buffer.from(logBuf);
}
const logMsg = Logger.extractContainerMessage(logBuf);
if (logMsg != null) {
const message: LogMessage = {
message: logMsg.message,
timestamp: logMsg.timestamp,
serviceId,
imageId,
};
if (streamType === OutputStream.Stderr) {
message.isStdErr = true;
}
this.log(message);
}
})
.on('error', err => {
console.error('Error on container logs', err);
this.attached[streamType][containerId] = false;
})
.on('end', () => {
this.attached[streamType][containerId] = false;
});
});
});
}
private objectNameForLogs(eventObj: LogEventObject): string | null { private objectNameForLogs(eventObj: LogEventObject): string | null {
if (eventObj == null) { if (eventObj == null) {
return null; return null;
@ -294,30 +225,6 @@ export class Logger {
return null; return null;
} }
private static extractContainerMessage(
msgBuf: Buffer,
): { message: string; timestamp: number } | null {
// Non-tty message format from:
// https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) {
// Take the header from this message, and parse it as normal
msgBuf = msgBuf.slice(8);
}
const logLine = msgBuf.toString();
const space = logLine.indexOf(' ');
if (space > 0) {
let timestamp = new Date(logLine.substr(0, space)).getTime();
if (_.isNaN(timestamp)) {
timestamp = Date.now();
}
return {
timestamp,
message: logLine.substr(space + 1),
};
}
return null;
}
} }
export default Logger; export default Logger;

98
src/logging/container.ts Normal file
View File

@ -0,0 +1,98 @@
import * as es from 'event-stream';
import { EventEmitter } from 'events';
import * as _ from 'lodash';
import * as Stream from 'stream';
import StrictEventEmitter from 'strict-event-emitter-types';
import Docker from '../lib/docker-utils';
export interface ContainerLog {
message: string;
timestamp: number;
isStdout: boolean;
}
interface LogsEvents {
log: ContainerLog;
closed: void;
error: Error;
}
type LogsEventEmitter = StrictEventEmitter<EventEmitter, LogsEvents>;
export class ContainerLogs extends (EventEmitter as {
new (): LogsEventEmitter;
}) {
public constructor(public containerId: string, private docker: Docker) {
super();
}
public async attach() {
const logOpts = {
follow: true,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
const stdoutLogOpts = { stdout: true, stderr: false, ...logOpts };
const stderrLogOpts = { stderr: true, stdout: false, ...logOpts };
const container = this.docker.getContainer(this.containerId);
const stdoutStream = await container.logs(stdoutLogOpts);
const stderrStream = await container.logs(stderrLogOpts);
[[stdoutStream, true], [stderrStream, false]].forEach(
([stream, isStdout]: [Stream.Readable, boolean]) => {
stream
.on('error', err => {
this.emit(
'error',
new Error(`Error on container logs: ${err} ${err.stack}`),
);
})
.pipe(es.split())
.on('data', (logBuf: Buffer | string) => {
if (_.isString(logBuf)) {
logBuf = Buffer.from(logBuf);
}
const logMsg = ContainerLogs.extractMessage(logBuf);
if (logMsg != null) {
this.emit('log', { isStdout, ...logMsg });
}
})
.on('error', err => {
this.emit(
'error',
new Error(`Error on container logs: ${err} ${err.stack}`),
);
})
.on('end', () => this.emit('closed'));
},
);
}
private static extractMessage(
msgBuf: Buffer,
): { message: string; timestamp: number } | null {
// Non-tty message format from:
// https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) {
// Take the header from this message, and parse it as normal
msgBuf = msgBuf.slice(8);
}
const logLine = msgBuf.toString();
const space = logLine.indexOf(' ');
if (space > 0) {
let timestamp = new Date(logLine.substr(0, space)).getTime();
if (_.isNaN(timestamp)) {
timestamp = Date.now();
}
return {
timestamp,
message: logLine.substr(space + 1),
};
}
return null;
}
}
export default ContainerLogs;

View File

@ -1,5 +1,12 @@
import { BalenaLogBackend } from './balena-backend';
import ContainerLogs from './container';
import { LocalLogBackend } from './local-backend'; import { LocalLogBackend } from './local-backend';
import { LogBackend, LogMessage } from './log-backend'; import { LogBackend, LogMessage } from './log-backend';
import { BalenaLogBackend } from './balena-backend';
export { LocalLogBackend, LogBackend, LogMessage, BalenaLogBackend }; export {
ContainerLogs,
LocalLogBackend,
LogBackend,
LogMessage,
BalenaLogBackend,
};