mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-04-24 13:05:51 +00:00
Merge pull request #955 from balena-io/937-better-logs
Better supervisor and container logging
This commit is contained in:
commit
eeaa2fcfc9
@ -75,6 +75,19 @@ module.exports = class ApplicationManager extends EventEmitter
|
||||
@_targetVolatilePerImageId = {}
|
||||
@_containerStarted = {}
|
||||
|
||||
# Rather than relying on removing out of data database entries when we're no
|
||||
# longer using them, set a task that runs periodically to clear out the database
|
||||
# This has the advantage that if for some reason a container is removed while the
|
||||
# supervisor is down, we won't have zombie entries in the db
|
||||
setInterval =>
|
||||
@docker.listContainers(all: true).then (containers) =>
|
||||
@logger.clearOutOfDateDBLogs(_.map(containers, 'Id'))
|
||||
# Once a day
|
||||
, 1000 * 60 * 60 * 24
|
||||
# But also run it in the background on startup
|
||||
@docker.listContainers(all: true).then (containers) =>
|
||||
@logger.clearOutOfDateDBLogs(_.map(containers, 'Id'))
|
||||
|
||||
@config.on 'change', (changedConfig) =>
|
||||
if changedConfig.appUpdatePollInterval
|
||||
@images.appUpdatePollInterval = changedConfig.appUpdatePollInterval
|
||||
|
@ -364,9 +364,7 @@ export class ServiceManager extends (EventEmitter as {
|
||||
|
||||
this.logger.attach(this.docker, container.id, { serviceId, imageId });
|
||||
|
||||
if (alreadyStarted) {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||
} else {
|
||||
if (!alreadyStarted) {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceSuccess, { service });
|
||||
}
|
||||
|
||||
@ -471,8 +469,6 @@ export class ServiceManager extends (EventEmitter as {
|
||||
const services = await this.getAll();
|
||||
for (const service of services) {
|
||||
if (service.status === 'Running') {
|
||||
this.logger.logSystemEvent(LogTypes.startServiceNoop, { service });
|
||||
|
||||
const serviceId = service.serviceId;
|
||||
const imageId = service.imageId;
|
||||
if (serviceId == null || imageId == null) {
|
||||
|
@ -92,10 +92,6 @@ export const startServiceSuccess: LogType = {
|
||||
eventName: 'Service started',
|
||||
humanName: 'Started service',
|
||||
};
|
||||
export const startServiceNoop: LogType = {
|
||||
eventName: 'Service already running',
|
||||
humanName: 'Service is already running',
|
||||
};
|
||||
export const startServiceError: LogType = {
|
||||
eventName: 'Service start error',
|
||||
humanName: 'Failed to start service',
|
||||
|
166
src/logger.ts
166
src/logger.ts
@ -1,17 +1,18 @@
|
||||
import * as Bluebird from 'bluebird';
|
||||
import * as es from 'event-stream';
|
||||
import * as _ from 'lodash';
|
||||
|
||||
import DB from './db';
|
||||
import { EventTracker } from './event-tracker';
|
||||
import Docker from './lib/docker-utils';
|
||||
import { LogType } from './lib/log-types';
|
||||
import { writeLock } from './lib/update-lock';
|
||||
import {
|
||||
BalenaLogBackend,
|
||||
ContainerLogs,
|
||||
LocalLogBackend,
|
||||
LogBackend,
|
||||
LogMessage,
|
||||
} from './logging-backends';
|
||||
} from './logging';
|
||||
|
||||
interface LoggerSetupOptions {
|
||||
apiEndpoint: string;
|
||||
@ -24,12 +25,8 @@ interface LoggerSetupOptions {
|
||||
|
||||
type LogEventObject = Dictionary<any> | null;
|
||||
|
||||
enum OutputStream {
|
||||
Stdout,
|
||||
Stderr,
|
||||
}
|
||||
|
||||
interface LoggerConstructOptions {
|
||||
db: DB;
|
||||
eventTracker: EventTracker;
|
||||
}
|
||||
|
||||
@ -39,16 +36,13 @@ export class Logger {
|
||||
private localBackend: LocalLogBackend | null = null;
|
||||
|
||||
private eventTracker: EventTracker;
|
||||
private attached: {
|
||||
[key in OutputStream]: { [containerId: string]: boolean }
|
||||
} = {
|
||||
[OutputStream.Stderr]: {},
|
||||
[OutputStream.Stdout]: {},
|
||||
};
|
||||
private db: DB;
|
||||
private containerLogs: { [containerId: string]: ContainerLogs } = {};
|
||||
|
||||
public constructor({ eventTracker }: LoggerConstructOptions) {
|
||||
public constructor({ db, eventTracker }: LoggerConstructOptions) {
|
||||
this.backend = null;
|
||||
this.eventTracker = eventTracker;
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
public init({
|
||||
@ -139,20 +133,47 @@ export class Logger {
|
||||
containerId: string,
|
||||
serviceInfo: { serviceId: number; imageId: number },
|
||||
): Bluebird<void> {
|
||||
return Bluebird.using(this.lock(containerId), () => {
|
||||
return this.attachStream(
|
||||
docker,
|
||||
OutputStream.Stdout,
|
||||
containerId,
|
||||
serviceInfo,
|
||||
).then(() => {
|
||||
return this.attachStream(
|
||||
docker,
|
||||
OutputStream.Stderr,
|
||||
containerId,
|
||||
serviceInfo,
|
||||
);
|
||||
// First detect if we already have an attached log stream
|
||||
// for this container
|
||||
if (containerId in this.containerLogs) {
|
||||
return Bluebird.resolve();
|
||||
}
|
||||
|
||||
return Bluebird.using(this.lock(containerId), async () => {
|
||||
const logs = new ContainerLogs(containerId, docker);
|
||||
this.containerLogs[containerId] = logs;
|
||||
logs.on('error', err => {
|
||||
console.error(`Container log retrieval error: ${err}`);
|
||||
delete this.containerLogs[containerId];
|
||||
});
|
||||
logs.on('log', async logMessage => {
|
||||
this.log(_.merge({}, serviceInfo, logMessage));
|
||||
|
||||
// Take the timestamp and set it in the database as the last
|
||||
// log sent for this
|
||||
await this.db
|
||||
.models('containerLogs')
|
||||
.where({ containerId })
|
||||
.update({ lastSentTimestamp: logMessage.timestamp });
|
||||
});
|
||||
|
||||
logs.on('closed', () => delete this.containerLogs[containerId]);
|
||||
|
||||
// Get the timestamp of the last sent log for this container
|
||||
let [timestampObj] = await this.db
|
||||
.models('containerLogs')
|
||||
.select('lastSentTimestamp')
|
||||
.where({ containerId });
|
||||
|
||||
if (timestampObj == null) {
|
||||
timestampObj = { lastSentTimestamp: 0 };
|
||||
// Create the row so we have something to update
|
||||
await this.db
|
||||
.models('containerLogs')
|
||||
.insert({ containerId, lastSentTimestamp: 0 });
|
||||
}
|
||||
const { lastSentTimestamp } = timestampObj;
|
||||
return logs.attach(lastSentTimestamp);
|
||||
});
|
||||
}
|
||||
|
||||
@ -200,67 +221,12 @@ export class Logger {
|
||||
this.logSystemMessage(message, obj, eventName);
|
||||
}
|
||||
|
||||
// TODO: This function interacts with the docker daemon directly,
|
||||
// using the container id, but it would be better if this was provided
|
||||
// by the Compose/Service-Manager module, as an accessor
|
||||
private attachStream(
|
||||
docker: Docker,
|
||||
streamType: OutputStream,
|
||||
containerId: string,
|
||||
{ serviceId, imageId }: { serviceId: number; imageId: number },
|
||||
): Bluebird<void> {
|
||||
return Bluebird.try(() => {
|
||||
if (this.attached[streamType][containerId]) {
|
||||
return;
|
||||
}
|
||||
|
||||
const logsOpts = {
|
||||
follow: true,
|
||||
stdout: streamType === OutputStream.Stdout,
|
||||
stderr: streamType === OutputStream.Stderr,
|
||||
timestamps: true,
|
||||
since: Math.floor(Date.now() / 1000),
|
||||
};
|
||||
|
||||
return docker
|
||||
.getContainer(containerId)
|
||||
.logs(logsOpts)
|
||||
.then(stream => {
|
||||
this.attached[streamType][containerId] = true;
|
||||
|
||||
stream
|
||||
.on('error', err => {
|
||||
console.error('Error on container logs', err);
|
||||
this.attached[streamType][containerId] = false;
|
||||
})
|
||||
.pipe(es.split())
|
||||
.on('data', (logBuf: Buffer | string) => {
|
||||
if (_.isString(logBuf)) {
|
||||
logBuf = Buffer.from(logBuf);
|
||||
}
|
||||
const logMsg = Logger.extractContainerMessage(logBuf);
|
||||
if (logMsg != null) {
|
||||
const message: LogMessage = {
|
||||
message: logMsg.message,
|
||||
timestamp: logMsg.timestamp,
|
||||
serviceId,
|
||||
imageId,
|
||||
};
|
||||
if (streamType === OutputStream.Stderr) {
|
||||
message.isStdErr = true;
|
||||
}
|
||||
this.log(message);
|
||||
}
|
||||
})
|
||||
.on('error', err => {
|
||||
console.error('Error on container logs', err);
|
||||
this.attached[streamType][containerId] = false;
|
||||
})
|
||||
.on('end', () => {
|
||||
this.attached[streamType][containerId] = false;
|
||||
});
|
||||
});
|
||||
});
|
||||
public async clearOutOfDateDBLogs(containerIds: string[]) {
|
||||
console.log('Performing database cleanup for container log timestamps');
|
||||
await this.db
|
||||
.models('containerLogs')
|
||||
.whereNotIn('containerId', containerIds)
|
||||
.delete();
|
||||
}
|
||||
|
||||
private objectNameForLogs(eventObj: LogEventObject): string | null {
|
||||
@ -294,30 +260,6 @@ export class Logger {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static extractContainerMessage(
|
||||
msgBuf: Buffer,
|
||||
): { message: string; timestamp: number } | null {
|
||||
// Non-tty message format from:
|
||||
// https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
|
||||
if (msgBuf[0] in [0, 1, 2] && _.every(msgBuf.slice(1, 7), c => c === 0)) {
|
||||
// Take the header from this message, and parse it as normal
|
||||
msgBuf = msgBuf.slice(8);
|
||||
}
|
||||
const logLine = msgBuf.toString();
|
||||
const space = logLine.indexOf(' ');
|
||||
if (space > 0) {
|
||||
let timestamp = new Date(logLine.substr(0, space)).getTime();
|
||||
if (_.isNaN(timestamp)) {
|
||||
timestamp = Date.now();
|
||||
}
|
||||
return {
|
||||
timestamp,
|
||||
message: logLine.substr(space + 1),
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export default Logger;
|
||||
|
101
src/logging/container.ts
Normal file
101
src/logging/container.ts
Normal file
@ -0,0 +1,101 @@
|
||||
import * as es from 'event-stream';
|
||||
import { EventEmitter } from 'events';
|
||||
import * as _ from 'lodash';
|
||||
import * as Stream from 'stream';
|
||||
import StrictEventEmitter from 'strict-event-emitter-types';
|
||||
|
||||
import Docker from '../lib/docker-utils';
|
||||
|
||||
export interface ContainerLog {
|
||||
message: string;
|
||||
timestamp: number;
|
||||
isStdout: boolean;
|
||||
}
|
||||
|
||||
interface LogsEvents {
|
||||
log: ContainerLog;
|
||||
closed: void;
|
||||
error: Error;
|
||||
}
|
||||
|
||||
type LogsEventEmitter = StrictEventEmitter<EventEmitter, LogsEvents>;
|
||||
|
||||
export class ContainerLogs extends (EventEmitter as {
|
||||
new (): LogsEventEmitter;
|
||||
}) {
|
||||
public constructor(public containerId: string, private docker: Docker) {
|
||||
super();
|
||||
}
|
||||
|
||||
public async attach(lastSentTimestamp: number) {
|
||||
const logOpts = {
|
||||
follow: true,
|
||||
timestamps: true,
|
||||
since: Math.floor(lastSentTimestamp / 1000),
|
||||
};
|
||||
const stdoutLogOpts = { stdout: true, stderr: false, ...logOpts };
|
||||
const stderrLogOpts = { stderr: true, stdout: false, ...logOpts };
|
||||
|
||||
const container = this.docker.getContainer(this.containerId);
|
||||
const stdoutStream = await container.logs(stdoutLogOpts);
|
||||
const stderrStream = await container.logs(stderrLogOpts);
|
||||
|
||||
[[stdoutStream, true], [stderrStream, false]].forEach(
|
||||
([stream, isStdout]: [Stream.Readable, boolean]) => {
|
||||
stream
|
||||
.on('error', err => {
|
||||
this.emit(
|
||||
'error',
|
||||
new Error(`Error on container logs: ${err} ${err.stack}`),
|
||||
);
|
||||
})
|
||||
.pipe(es.split())
|
||||
.on('data', (logBuf: Buffer | string) => {
|
||||
if (_.isString(logBuf)) {
|
||||
logBuf = Buffer.from(logBuf);
|
||||
}
|
||||
const logMsg = ContainerLogs.extractMessage(logBuf);
|
||||
if (logMsg != null) {
|
||||
this.emit('log', { isStdout, ...logMsg });
|
||||
}
|
||||
})
|
||||
.on('error', err => {
|
||||
this.emit(
|
||||
'error',
|
||||
new Error(`Error on container logs: ${err} ${err.stack}`),
|
||||
);
|
||||
})
|
||||
.on('end', () => this.emit('closed'));
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private static extractMessage(
|
||||
msgBuf: Buffer,
|
||||
): { message: string; timestamp: number } | undefined {
|
||||
// Non-tty message format from:
|
||||
// https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
|
||||
if (
|
||||
_.includes([0, 1, 2], msgBuf[0]) &&
|
||||
_.every(msgBuf.slice(1, 7), c => c === 0)
|
||||
) {
|
||||
// Take the header from this message, and parse it as normal
|
||||
msgBuf = msgBuf.slice(8);
|
||||
}
|
||||
const logLine = msgBuf.toString();
|
||||
const space = logLine.indexOf(' ');
|
||||
if (space > 0) {
|
||||
let timestamp = new Date(logLine.substr(0, space)).getTime();
|
||||
if (_.isNaN(timestamp)) {
|
||||
timestamp = Date.now();
|
||||
}
|
||||
return {
|
||||
timestamp,
|
||||
message: logLine.substr(space + 1),
|
||||
};
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
export default ContainerLogs;
|
@ -1,5 +1,12 @@
|
||||
import { BalenaLogBackend } from './balena-backend';
|
||||
import ContainerLogs from './container';
|
||||
import { LocalLogBackend } from './local-backend';
|
||||
import { LogBackend, LogMessage } from './log-backend';
|
||||
import { BalenaLogBackend } from './balena-backend';
|
||||
|
||||
export { LocalLogBackend, LogBackend, LogMessage, BalenaLogBackend };
|
||||
export {
|
||||
ContainerLogs,
|
||||
LocalLogBackend,
|
||||
LogBackend,
|
||||
LogMessage,
|
||||
BalenaLogBackend,
|
||||
};
|
10
src/migrations/M00004.js
Normal file
10
src/migrations/M00004.js
Normal file
@ -0,0 +1,10 @@
|
||||
exports.up = function(knex) {
|
||||
return knex.schema.createTable('containerLogs', table => {
|
||||
table.string('containerId');
|
||||
table.integer('lastSentTimestamp');
|
||||
});
|
||||
};
|
||||
|
||||
exports.down = function(knex, Promise) {
|
||||
return Promise.reject(new Error('Not Implemented'));
|
||||
};
|
@ -30,7 +30,7 @@ module.exports = class Supervisor extends EventEmitter
|
||||
@db = new DB()
|
||||
@config = new Config({ @db })
|
||||
@eventTracker = new EventTracker()
|
||||
@logger = new Logger({ @eventTracker })
|
||||
@logger = new Logger({ @db, @eventTracker })
|
||||
@deviceState = new DeviceState({ @config, @db, @eventTracker, @logger })
|
||||
@apiBinder = new APIBinder({ @config, @db, @deviceState, @eventTracker })
|
||||
|
||||
@ -60,12 +60,16 @@ module.exports = class Supervisor extends EventEmitter
|
||||
.then =>
|
||||
@config.getMany(startupConfigFields)
|
||||
.then (conf) =>
|
||||
# We can't print to the dashboard until the logger has started up,
|
||||
# so we leave a trail of breadcrumbs in the logs in case runtime
|
||||
# fails to get to the first dashboard logs
|
||||
console.log('Starting event tracker')
|
||||
@eventTracker.init(conf)
|
||||
.then =>
|
||||
@eventTracker.track('Supervisor start')
|
||||
.then =>
|
||||
console.log('Starting up api binder')
|
||||
@apiBinder.initClient()
|
||||
.then =>
|
||||
console.log('Starting logging infrastructure')
|
||||
@logger.init({
|
||||
apiEndpoint: conf.apiEndpoint,
|
||||
uuid: conf.uuid,
|
||||
@ -74,6 +78,8 @@ module.exports = class Supervisor extends EventEmitter
|
||||
enableLogs: conf.loggingEnabled,
|
||||
localMode: conf.localMode
|
||||
})
|
||||
.then =>
|
||||
@logger.logSystemMessage('Supervisor starting', {}, 'Supervisor start')
|
||||
.then =>
|
||||
if conf.legacyAppsPresent
|
||||
console.log('Legacy app detected, running migration')
|
||||
|
@ -8,6 +8,7 @@ m = require 'mochainon'
|
||||
{ stub } = m.sinon
|
||||
|
||||
{ Logger } = require '../src/logger'
|
||||
{ ContainerLogs } = require '../src/logging/container'
|
||||
describe 'Logger', ->
|
||||
beforeEach ->
|
||||
@_req = new stream.PassThrough()
|
||||
@ -111,7 +112,7 @@ describe 'Logger', ->
|
||||
message = '\u0001\u0000\u0000\u0000\u0000\u0000\u0000?2018-09-21T12:37:09.819134000Z this is the message'
|
||||
buffer = Buffer.from(message)
|
||||
|
||||
expect(Logger.extractContainerMessage(buffer)).to.deep.equal({
|
||||
expect(ContainerLogs.extractMessage(buffer)).to.deep.equal({
|
||||
message: 'this is the message',
|
||||
timestamp: 1537533429819
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user