From 2c83864f22b01e7196f6ed321c96f6cb9f40593b Mon Sep 17 00:00:00 2001 From: Thomas Manning Date: Fri, 23 Oct 2020 19:42:15 +1000 Subject: [PATCH] Change log source from docker to journalctl Change-type: minor Signed-off-by: Thomas Manning --- src/compose/service-manager.ts | 5 +- src/lib/journald.ts | 17 ++- src/logger.ts | 32 +---- src/logging/monitor.ts | 228 ++++++++++++++++++++++++++------- src/supervisor.ts | 3 + 5 files changed, 208 insertions(+), 77 deletions(-) diff --git a/src/compose/service-manager.ts b/src/compose/service-manager.ts index 5059f168..d3aa8911 100644 --- a/src/compose/service-manager.ts +++ b/src/compose/service-manager.ts @@ -24,6 +24,7 @@ import { Service } from './service'; import { serviceNetworksToDockerNetworks } from './utils'; import log from '../lib/supervisor-console'; +import logMonitor from '../logging/monitor'; interface ServiceManagerEvents { change: void; @@ -383,7 +384,7 @@ export function listenToEvents() { parser.on('data', async (data: { status: string; id: string }) => { if (data != null) { const status = data.status; - if (status === 'die' || status === 'start') { + if (status === 'die' || status === 'start' || status === 'destroy') { try { let service: Service | null = null; try { @@ -415,6 +416,8 @@ export function listenToEvents() { serviceId, imageId, }); + } else if (status === 'destroy') { + await logMonitor.detach(data.id); } } } catch (e) { diff --git a/src/lib/journald.ts b/src/lib/journald.ts index 28ab487a..219433dd 100644 --- a/src/lib/journald.ts +++ b/src/lib/journald.ts @@ -6,10 +6,12 @@ import log from './supervisor-console'; export function spawnJournalctl(opts: { all: boolean; follow: boolean; - count?: number; + count?: number | 'all'; unit?: string; containerId?: string; format: string; + filterString?: string; + since?: number; }): ChildProcess { const args = [ // The directory we want to run the chroot from @@ -34,9 +36,22 @@ export function spawnJournalctl(opts: { args.push('-n'); args.push(opts.count.toString()); } + if (opts.since != null) { + args.push('-S'); + args.push( + new Date(opts.since) + .toISOString() + .replace(/T/, ' ') // replace T with a space + .replace(/\..+/, ''), // delete the dot and everything after + ); + } args.push('-o'); args.push(opts.format); + if (opts.filterString) { + args.push(opts.filterString); + } + log.debug('Spawning journald with: chroot ', args.join(' ')); const journald = spawn('chroot', args, { diff --git a/src/logger.ts b/src/logger.ts index 9f178e36..66ce4c6d 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -8,12 +8,11 @@ import { LogType } from './lib/log-types'; import { writeLock } from './lib/update-lock'; import { BalenaLogBackend, - ContainerLogs, LocalLogBackend, LogBackend, LogMessage, } from './logging'; -import LogMonitor from './logging/monitor'; +import logMonitor from './logging/monitor'; import * as globalEventBus from './event-bus'; import superConsole from './lib/supervisor-console'; @@ -25,9 +24,6 @@ let backend: LogBackend | null = null; let balenaBackend: BalenaLogBackend | null = null; let localBackend: LocalLogBackend | null = null; -const containerLogs: { [containerId: string]: ContainerLogs } = {}; -const logMonitor = new LogMonitor(); - export const initialized = (async () => { await config.initialized; const { @@ -149,34 +145,14 @@ export function attach( ): Bluebird { // First detect if we already have an attached log stream // for this container - if (containerId in containerLogs) { + if (logMonitor.isAttached(containerId)) { return Bluebird.resolve(); } return Bluebird.using(lock(containerId), async () => { - const logs = new ContainerLogs(containerId); - containerLogs[containerId] = logs; - logs.on('error', (err) => { - superConsole.error('Container log retrieval error', err); - delete containerLogs[containerId]; + logMonitor.attach(containerId, (message) => { + log({ ...serviceInfo, ...message }); }); - logs.on('log', async (logMessage) => { - log(_.merge({}, serviceInfo, logMessage)); - - // Take the timestamp and set it in the database as the last - // log sent for this - logMonitor.updateContainerSentTimestamp( - containerId, - logMessage.timestamp, - ); - }); - - logs.on('closed', () => delete containerLogs[containerId]); - - const lastSentTimestamp = await logMonitor.getContainerSentTimestamp( - containerId, - ); - return logs.attach(lastSentTimestamp); }); } diff --git a/src/logging/monitor.ts b/src/logging/monitor.ts index 61e9ae46..f0b4ad06 100644 --- a/src/logging/monitor.ts +++ b/src/logging/monitor.ts @@ -1,79 +1,211 @@ -import * as _ from 'lodash'; +import * as JSONstream from 'JSONStream'; +import { delay } from 'bluebird'; import * as db from '../db'; - +import { spawnJournalctl } from '../lib/journald'; import log from '../lib/supervisor-console'; +export type MonitorHook = ({ + message, + isStdErr, +}: { + message: string; + isStdErr: boolean; + timestamp: number; +}) => Resolvable; + +// This is nowhere near the amount of fields provided by journald, but simply the ones +// that we are interested in +interface JournalRow { + CONTAINER_ID_FULL?: string; + CONTAINER_NAME?: string; + MESSAGE: string | number[]; + PRIORITY: string; + __REALTIME_TIMESTAMP: string; +} + // Flush every 10 mins const DB_FLUSH_INTERVAL = 10 * 60 * 1000; +// Wait 5s when journalctl failed before trying to read the logs again +const JOURNALCTL_ERROR_RETRY_DELAY = 5000; + +function messageFieldToString(entry: JournalRow['MESSAGE']): string | null { + if (Array.isArray(entry)) { + return String.fromCharCode(...entry); + } else if (typeof entry === 'string') { + return entry; + } else { + log.error( + `Unknown journald message field type: ${typeof entry}. Dropping log.`, + ); + return null; + } +} + /** - * This class provides a wrapper around the database for - * saving the last timestamp of a container + * Streams logs from journalctl and calls container hooks when a record is received matching container id */ -export class LogMonitor { - private timestamps: { [containerId: string]: number } = {}; - private writeRequired: { [containerId: string]: boolean } = {}; +class LogMonitor { + private containers: { + [containerId: string]: { + hook: MonitorHook; + follow: boolean; + timestamp: number; + writeRequired: boolean; + }; + } = {}; public constructor() { setInterval(() => this.flushDb(), DB_FLUSH_INTERVAL); } - public updateContainerSentTimestamp( + public start() { + this.streamLogsFromJournal( + { + all: true, + follow: true, + format: 'json', + filterString: '_SYSTEMD_UNIT=balena.service', + }, + (row: JournalRow) => { + if (row.CONTAINER_ID_FULL && this.containers[row.CONTAINER_ID_FULL]) { + this.handleRow(row); + } + }, + async (data: Buffer) => { + log.error( + 'Non-empty stderr stream from journalctl log fetching: ', + data.toString(), + ); + await delay(JOURNALCTL_ERROR_RETRY_DELAY); + this.start(); + }, + ); + } + + public isAttached(containerId: string): boolean { + return containerId in this.containers; + } + + public async attach(containerId: string, hook: MonitorHook) { + if (!this.containers[containerId]) { + this.containers[containerId] = { + hook, + follow: false, + timestamp: Date.now(), + writeRequired: false, + }; + this.containers[ + containerId + ].timestamp = await this.getContainerSentTimestamp(containerId); + this.backfill(containerId, this.containers[containerId].timestamp); + } + } + + public async detach(containerId: string) { + delete this.containers[containerId]; + await db.models('containerLogs').delete().where({ containerId }); + } + + private streamLogsFromJournal( + options: Parameters[0], + onRow: (row: JournalRow) => void, + onError: (data: Buffer) => void, + onClose?: () => void, + ): ReturnType { + const journalctl = spawnJournalctl(options); + journalctl.stdout?.pipe(JSONstream.parse(true).on('data', onRow)); + journalctl.stderr?.on('data', onError); + if (onClose) { + journalctl.on('close', onClose); + } + return journalctl; + } + + /** + * stream logs from lastSentTimestamp until now so logs are not missed if the container started before supervisor + */ + private backfill(containerId: string, lastSentTimestamp: number) { + this.streamLogsFromJournal( + { + all: true, + follow: false, + format: 'json', + filterString: `CONTAINER_ID_FULL=${containerId}`, + since: lastSentTimestamp + 1, // increment to exclude last sent log + }, + (row: JournalRow) => this.handleRow(row), + async (data: Buffer) => { + log.error('journalctl backfill error: ', data.toString()); + }, + () => { + this.containers[containerId].follow = true; + }, + ); + } + + private handleRow(row: JournalRow) { + if (row.CONTAINER_ID_FULL && row.CONTAINER_NAME !== 'resin_supervisor') { + const containerId = row.CONTAINER_ID_FULL; + const message = messageFieldToString(row.MESSAGE); + const isStdErr = row.PRIORITY === '3'; + const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds + if (message != null && this.containers[containerId]) { + this.updateContainerSentTimestamp(containerId, timestamp); + this.containers[containerId].hook({ message, isStdErr, timestamp }); + } + } + } + + private updateContainerSentTimestamp( containerId: string, timestamp: number, ): void { - this.timestamps[containerId] = timestamp; - this.writeRequired[containerId] = true; + this.containers[containerId].timestamp = timestamp; + this.containers[containerId].writeRequired = true; } - public async getContainerSentTimestamp(containerId: string): Promise { - // If this is the first time we are requesting the - // timestamp for this container, request it from the db - if (this.timestamps[containerId] == null) { - // Set the timestamp to 0 before interacting with the - // db to avoid multiple db actions at once - this.timestamps[containerId] = 0; - try { - const timestampObj = await db - .models('containerLogs') - .select('lastSentTimestamp') - .where({ containerId }); + private async getContainerSentTimestamp( + containerId: string, + ): Promise { + try { + const row = await db + .models('containerLogs') + .select('lastSentTimestamp') + .where({ containerId }) + .first(); - if (timestampObj == null || _.isEmpty(timestampObj)) { - // Create a row in the db so there's something to - // update - await db - .models('containerLogs') - .insert({ containerId, lastSentTimestamp: 0 }); - } else { - this.timestamps[containerId] = timestampObj[0].lastSentTimestamp; - } - } catch (e) { - log.error( - 'There was an error retrieving the container log timestamps:', - e, - ); + if (!row) { + const now = Date.now(); + await db + .models('containerLogs') + .insert({ containerId, lastSentTimestamp: now }); + return now; + } else { + return row.lastSentTimestamp; } + } catch (e) { + log.error( + 'There was an error retrieving the container log timestamps:', + e, + ); + return Date.now(); } - return this.timestamps[containerId] || 0; } private async flushDb() { log.debug('Attempting container log timestamp flush...'); - const containerIds = Object.getOwnPropertyNames(this.timestamps); try { - for (const containerId of containerIds) { + for (const containerId of Object.keys(this.containers)) { // Avoid writing to the db if we don't need to - if (!this.writeRequired[containerId]) { + if (!this.containers[containerId].writeRequired) { continue; } - - await db - .models('containerLogs') - .where({ containerId }) - .update({ lastSentTimestamp: this.timestamps[containerId] }); - this.writeRequired[containerId] = false; + await db.models('containerLogs').where({ containerId }).update({ + lastSentTimestamp: this.containers[containerId].timestamp, + }); + this.containers[containerId].writeRequired = false; } } catch (e) { log.error('There was an error storing the container log timestamps:', e); @@ -82,4 +214,6 @@ export class LogMonitor { } } -export default LogMonitor; +const logMonitor = new LogMonitor(); + +export default logMonitor; diff --git a/src/supervisor.ts b/src/supervisor.ts index 7827bb7e..63d4aa43 100644 --- a/src/supervisor.ts +++ b/src/supervisor.ts @@ -14,6 +14,7 @@ import version = require('./lib/supervisor-version'); import * as avahi from './lib/avahi'; import * as firewall from './lib/firewall'; +import logMonitor from './logging/monitor'; const startupConfigFields: config.ConfigKey[] = [ 'uuid', @@ -75,6 +76,8 @@ export class Supervisor { deviceState.on('shutdown', () => this.api.stop()); await apiBinder.start(); + + logMonitor.start(); } }