Change log source from docker to journalctl

Change-type: minor
Signed-off-by: Thomas Manning <thomasm@balena.io>
This commit is contained in:
Thomas Manning 2020-10-23 19:42:15 +10:00
parent 622e8edec1
commit 2c83864f22
5 changed files with 208 additions and 77 deletions

View File

@ -24,6 +24,7 @@ import { Service } from './service';
import { serviceNetworksToDockerNetworks } from './utils';
import log from '../lib/supervisor-console';
import logMonitor from '../logging/monitor';
interface ServiceManagerEvents {
change: void;
@ -383,7 +384,7 @@ export function listenToEvents() {
parser.on('data', async (data: { status: string; id: string }) => {
if (data != null) {
const status = data.status;
if (status === 'die' || status === 'start') {
if (status === 'die' || status === 'start' || status === 'destroy') {
try {
let service: Service | null = null;
try {
@ -415,6 +416,8 @@ export function listenToEvents() {
serviceId,
imageId,
});
} else if (status === 'destroy') {
await logMonitor.detach(data.id);
}
}
} catch (e) {

View File

@ -6,10 +6,12 @@ import log from './supervisor-console';
export function spawnJournalctl(opts: {
all: boolean;
follow: boolean;
count?: number;
count?: number | 'all';
unit?: string;
containerId?: string;
format: string;
filterString?: string;
since?: number;
}): ChildProcess {
const args = [
// The directory we want to run the chroot from
@ -34,9 +36,22 @@ export function spawnJournalctl(opts: {
args.push('-n');
args.push(opts.count.toString());
}
if (opts.since != null) {
args.push('-S');
args.push(
new Date(opts.since)
.toISOString()
.replace(/T/, ' ') // replace T with a space
.replace(/\..+/, ''), // delete the dot and everything after
);
}
args.push('-o');
args.push(opts.format);
if (opts.filterString) {
args.push(opts.filterString);
}
log.debug('Spawning journald with: chroot ', args.join(' '));
const journald = spawn('chroot', args, {

View File

@ -8,12 +8,11 @@ import { LogType } from './lib/log-types';
import { writeLock } from './lib/update-lock';
import {
BalenaLogBackend,
ContainerLogs,
LocalLogBackend,
LogBackend,
LogMessage,
} from './logging';
import LogMonitor from './logging/monitor';
import logMonitor from './logging/monitor';
import * as globalEventBus from './event-bus';
import superConsole from './lib/supervisor-console';
@ -25,9 +24,6 @@ let backend: LogBackend | null = null;
let balenaBackend: BalenaLogBackend | null = null;
let localBackend: LocalLogBackend | null = null;
const containerLogs: { [containerId: string]: ContainerLogs } = {};
const logMonitor = new LogMonitor();
export const initialized = (async () => {
await config.initialized;
const {
@ -149,34 +145,14 @@ export function attach(
): Bluebird<void> {
// First detect if we already have an attached log stream
// for this container
if (containerId in containerLogs) {
if (logMonitor.isAttached(containerId)) {
return Bluebird.resolve();
}
return Bluebird.using(lock(containerId), async () => {
const logs = new ContainerLogs(containerId);
containerLogs[containerId] = logs;
logs.on('error', (err) => {
superConsole.error('Container log retrieval error', err);
delete containerLogs[containerId];
logMonitor.attach(containerId, (message) => {
log({ ...serviceInfo, ...message });
});
logs.on('log', async (logMessage) => {
log(_.merge({}, serviceInfo, logMessage));
// Take the timestamp and set it in the database as the last
// log sent for this
logMonitor.updateContainerSentTimestamp(
containerId,
logMessage.timestamp,
);
});
logs.on('closed', () => delete containerLogs[containerId]);
const lastSentTimestamp = await logMonitor.getContainerSentTimestamp(
containerId,
);
return logs.attach(lastSentTimestamp);
});
}

View File

@ -1,79 +1,211 @@
import * as _ from 'lodash';
import * as JSONstream from 'JSONStream';
import { delay } from 'bluebird';
import * as db from '../db';
import { spawnJournalctl } from '../lib/journald';
import log from '../lib/supervisor-console';
export type MonitorHook = ({
message,
isStdErr,
}: {
message: string;
isStdErr: boolean;
timestamp: number;
}) => Resolvable<void>;
// This is nowhere near the amount of fields provided by journald, but simply the ones
// that we are interested in
interface JournalRow {
CONTAINER_ID_FULL?: string;
CONTAINER_NAME?: string;
MESSAGE: string | number[];
PRIORITY: string;
__REALTIME_TIMESTAMP: string;
}
// Flush every 10 mins
const DB_FLUSH_INTERVAL = 10 * 60 * 1000;
// Wait 5s when journalctl failed before trying to read the logs again
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
function messageFieldToString(entry: JournalRow['MESSAGE']): string | null {
if (Array.isArray(entry)) {
return String.fromCharCode(...entry);
} else if (typeof entry === 'string') {
return entry;
} else {
log.error(
`Unknown journald message field type: ${typeof entry}. Dropping log.`,
);
return null;
}
}
/**
* This class provides a wrapper around the database for
* saving the last timestamp of a container
* Streams logs from journalctl and calls container hooks when a record is received matching container id
*/
export class LogMonitor {
private timestamps: { [containerId: string]: number } = {};
private writeRequired: { [containerId: string]: boolean } = {};
class LogMonitor {
private containers: {
[containerId: string]: {
hook: MonitorHook;
follow: boolean;
timestamp: number;
writeRequired: boolean;
};
} = {};
public constructor() {
setInterval(() => this.flushDb(), DB_FLUSH_INTERVAL);
}
public updateContainerSentTimestamp(
public start() {
this.streamLogsFromJournal(
{
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
},
(row: JournalRow) => {
if (row.CONTAINER_ID_FULL && this.containers[row.CONTAINER_ID_FULL]) {
this.handleRow(row);
}
},
async (data: Buffer) => {
log.error(
'Non-empty stderr stream from journalctl log fetching: ',
data.toString(),
);
await delay(JOURNALCTL_ERROR_RETRY_DELAY);
this.start();
},
);
}
public isAttached(containerId: string): boolean {
return containerId in this.containers;
}
public async attach(containerId: string, hook: MonitorHook) {
if (!this.containers[containerId]) {
this.containers[containerId] = {
hook,
follow: false,
timestamp: Date.now(),
writeRequired: false,
};
this.containers[
containerId
].timestamp = await this.getContainerSentTimestamp(containerId);
this.backfill(containerId, this.containers[containerId].timestamp);
}
}
public async detach(containerId: string) {
delete this.containers[containerId];
await db.models('containerLogs').delete().where({ containerId });
}
private streamLogsFromJournal(
options: Parameters<typeof spawnJournalctl>[0],
onRow: (row: JournalRow) => void,
onError: (data: Buffer) => void,
onClose?: () => void,
): ReturnType<typeof spawnJournalctl> {
const journalctl = spawnJournalctl(options);
journalctl.stdout?.pipe(JSONstream.parse(true).on('data', onRow));
journalctl.stderr?.on('data', onError);
if (onClose) {
journalctl.on('close', onClose);
}
return journalctl;
}
/**
* stream logs from lastSentTimestamp until now so logs are not missed if the container started before supervisor
*/
private backfill(containerId: string, lastSentTimestamp: number) {
this.streamLogsFromJournal(
{
all: true,
follow: false,
format: 'json',
filterString: `CONTAINER_ID_FULL=${containerId}`,
since: lastSentTimestamp + 1, // increment to exclude last sent log
},
(row: JournalRow) => this.handleRow(row),
async (data: Buffer) => {
log.error('journalctl backfill error: ', data.toString());
},
() => {
this.containers[containerId].follow = true;
},
);
}
private handleRow(row: JournalRow) {
if (row.CONTAINER_ID_FULL && row.CONTAINER_NAME !== 'resin_supervisor') {
const containerId = row.CONTAINER_ID_FULL;
const message = messageFieldToString(row.MESSAGE);
const isStdErr = row.PRIORITY === '3';
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
if (message != null && this.containers[containerId]) {
this.updateContainerSentTimestamp(containerId, timestamp);
this.containers[containerId].hook({ message, isStdErr, timestamp });
}
}
}
private updateContainerSentTimestamp(
containerId: string,
timestamp: number,
): void {
this.timestamps[containerId] = timestamp;
this.writeRequired[containerId] = true;
this.containers[containerId].timestamp = timestamp;
this.containers[containerId].writeRequired = true;
}
public async getContainerSentTimestamp(containerId: string): Promise<number> {
// If this is the first time we are requesting the
// timestamp for this container, request it from the db
if (this.timestamps[containerId] == null) {
// Set the timestamp to 0 before interacting with the
// db to avoid multiple db actions at once
this.timestamps[containerId] = 0;
try {
const timestampObj = await db
.models('containerLogs')
.select('lastSentTimestamp')
.where({ containerId });
private async getContainerSentTimestamp(
containerId: string,
): Promise<number> {
try {
const row = await db
.models('containerLogs')
.select('lastSentTimestamp')
.where({ containerId })
.first();
if (timestampObj == null || _.isEmpty(timestampObj)) {
// Create a row in the db so there's something to
// update
await db
.models('containerLogs')
.insert({ containerId, lastSentTimestamp: 0 });
} else {
this.timestamps[containerId] = timestampObj[0].lastSentTimestamp;
}
} catch (e) {
log.error(
'There was an error retrieving the container log timestamps:',
e,
);
if (!row) {
const now = Date.now();
await db
.models('containerLogs')
.insert({ containerId, lastSentTimestamp: now });
return now;
} else {
return row.lastSentTimestamp;
}
} catch (e) {
log.error(
'There was an error retrieving the container log timestamps:',
e,
);
return Date.now();
}
return this.timestamps[containerId] || 0;
}
private async flushDb() {
log.debug('Attempting container log timestamp flush...');
const containerIds = Object.getOwnPropertyNames(this.timestamps);
try {
for (const containerId of containerIds) {
for (const containerId of Object.keys(this.containers)) {
// Avoid writing to the db if we don't need to
if (!this.writeRequired[containerId]) {
if (!this.containers[containerId].writeRequired) {
continue;
}
await db
.models('containerLogs')
.where({ containerId })
.update({ lastSentTimestamp: this.timestamps[containerId] });
this.writeRequired[containerId] = false;
await db.models('containerLogs').where({ containerId }).update({
lastSentTimestamp: this.containers[containerId].timestamp,
});
this.containers[containerId].writeRequired = false;
}
} catch (e) {
log.error('There was an error storing the container log timestamps:', e);
@ -82,4 +214,6 @@ export class LogMonitor {
}
}
export default LogMonitor;
const logMonitor = new LogMonitor();
export default logMonitor;

View File

@ -14,6 +14,7 @@ import version = require('./lib/supervisor-version');
import * as avahi from './lib/avahi';
import * as firewall from './lib/firewall';
import logMonitor from './logging/monitor';
const startupConfigFields: config.ConfigKey[] = [
'uuid',
@ -75,6 +76,8 @@ export class Supervisor {
deviceState.on('shutdown', () => this.api.stop());
await apiBinder.start();
logMonitor.start();
}
}