mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-01-11 15:32:47 +00:00
Merge pull request #1489 from balena-io/codewithcheese/read-logs-from-journalctl
Change log source from docker to journalctl
This commit is contained in:
commit
0c0211aa70
@ -24,6 +24,7 @@ import { Service } from './service';
|
||||
import { serviceNetworksToDockerNetworks } from './utils';
|
||||
|
||||
import log from '../lib/supervisor-console';
|
||||
import logMonitor from '../logging/monitor';
|
||||
|
||||
interface ServiceManagerEvents {
|
||||
change: void;
|
||||
@ -383,7 +384,7 @@ export function listenToEvents() {
|
||||
parser.on('data', async (data: { status: string; id: string }) => {
|
||||
if (data != null) {
|
||||
const status = data.status;
|
||||
if (status === 'die' || status === 'start') {
|
||||
if (status === 'die' || status === 'start' || status === 'destroy') {
|
||||
try {
|
||||
let service: Service | null = null;
|
||||
try {
|
||||
@ -415,6 +416,8 @@ export function listenToEvents() {
|
||||
serviceId,
|
||||
imageId,
|
||||
});
|
||||
} else if (status === 'destroy') {
|
||||
await logMonitor.detach(data.id);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
|
@ -6,10 +6,12 @@ import log from './supervisor-console';
|
||||
export function spawnJournalctl(opts: {
|
||||
all: boolean;
|
||||
follow: boolean;
|
||||
count?: number;
|
||||
count?: number | 'all';
|
||||
unit?: string;
|
||||
containerId?: string;
|
||||
format: string;
|
||||
filterString?: string;
|
||||
since?: number;
|
||||
}): ChildProcess {
|
||||
const args = [
|
||||
// The directory we want to run the chroot from
|
||||
@ -34,9 +36,22 @@ export function spawnJournalctl(opts: {
|
||||
args.push('-n');
|
||||
args.push(opts.count.toString());
|
||||
}
|
||||
if (opts.since != null) {
|
||||
args.push('-S');
|
||||
args.push(
|
||||
new Date(opts.since)
|
||||
.toISOString()
|
||||
.replace(/T/, ' ') // replace T with a space
|
||||
.replace(/\..+/, ''), // delete the dot and everything after
|
||||
);
|
||||
}
|
||||
args.push('-o');
|
||||
args.push(opts.format);
|
||||
|
||||
if (opts.filterString) {
|
||||
args.push(opts.filterString);
|
||||
}
|
||||
|
||||
log.debug('Spawning journald with: chroot ', args.join(' '));
|
||||
|
||||
const journald = spawn('chroot', args, {
|
||||
|
@ -8,12 +8,11 @@ import { LogType } from './lib/log-types';
|
||||
import { writeLock } from './lib/update-lock';
|
||||
import {
|
||||
BalenaLogBackend,
|
||||
ContainerLogs,
|
||||
LocalLogBackend,
|
||||
LogBackend,
|
||||
LogMessage,
|
||||
} from './logging';
|
||||
import LogMonitor from './logging/monitor';
|
||||
import logMonitor from './logging/monitor';
|
||||
|
||||
import * as globalEventBus from './event-bus';
|
||||
import superConsole from './lib/supervisor-console';
|
||||
@ -25,9 +24,6 @@ let backend: LogBackend | null = null;
|
||||
let balenaBackend: BalenaLogBackend | null = null;
|
||||
let localBackend: LocalLogBackend | null = null;
|
||||
|
||||
const containerLogs: { [containerId: string]: ContainerLogs } = {};
|
||||
const logMonitor = new LogMonitor();
|
||||
|
||||
export const initialized = (async () => {
|
||||
await config.initialized;
|
||||
const {
|
||||
@ -149,34 +145,14 @@ export function attach(
|
||||
): Bluebird<void> {
|
||||
// First detect if we already have an attached log stream
|
||||
// for this container
|
||||
if (containerId in containerLogs) {
|
||||
if (logMonitor.isAttached(containerId)) {
|
||||
return Bluebird.resolve();
|
||||
}
|
||||
|
||||
return Bluebird.using(lock(containerId), async () => {
|
||||
const logs = new ContainerLogs(containerId);
|
||||
containerLogs[containerId] = logs;
|
||||
logs.on('error', (err) => {
|
||||
superConsole.error('Container log retrieval error', err);
|
||||
delete containerLogs[containerId];
|
||||
logMonitor.attach(containerId, (message) => {
|
||||
log({ ...serviceInfo, ...message });
|
||||
});
|
||||
logs.on('log', async (logMessage) => {
|
||||
log(_.merge({}, serviceInfo, logMessage));
|
||||
|
||||
// Take the timestamp and set it in the database as the last
|
||||
// log sent for this
|
||||
logMonitor.updateContainerSentTimestamp(
|
||||
containerId,
|
||||
logMessage.timestamp,
|
||||
);
|
||||
});
|
||||
|
||||
logs.on('closed', () => delete containerLogs[containerId]);
|
||||
|
||||
const lastSentTimestamp = await logMonitor.getContainerSentTimestamp(
|
||||
containerId,
|
||||
);
|
||||
return logs.attach(lastSentTimestamp);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1,79 +1,211 @@
|
||||
import * as _ from 'lodash';
|
||||
import * as JSONstream from 'JSONStream';
|
||||
import { delay } from 'bluebird';
|
||||
|
||||
import * as db from '../db';
|
||||
|
||||
import { spawnJournalctl } from '../lib/journald';
|
||||
import log from '../lib/supervisor-console';
|
||||
|
||||
export type MonitorHook = ({
|
||||
message,
|
||||
isStdErr,
|
||||
}: {
|
||||
message: string;
|
||||
isStdErr: boolean;
|
||||
timestamp: number;
|
||||
}) => Resolvable<void>;
|
||||
|
||||
// This is nowhere near the amount of fields provided by journald, but simply the ones
|
||||
// that we are interested in
|
||||
interface JournalRow {
|
||||
CONTAINER_ID_FULL?: string;
|
||||
CONTAINER_NAME?: string;
|
||||
MESSAGE: string | number[];
|
||||
PRIORITY: string;
|
||||
__REALTIME_TIMESTAMP: string;
|
||||
}
|
||||
|
||||
// Flush every 10 mins
|
||||
const DB_FLUSH_INTERVAL = 10 * 60 * 1000;
|
||||
|
||||
// Wait 5s when journalctl failed before trying to read the logs again
|
||||
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
|
||||
|
||||
function messageFieldToString(entry: JournalRow['MESSAGE']): string | null {
|
||||
if (Array.isArray(entry)) {
|
||||
return String.fromCharCode(...entry);
|
||||
} else if (typeof entry === 'string') {
|
||||
return entry;
|
||||
} else {
|
||||
log.error(
|
||||
`Unknown journald message field type: ${typeof entry}. Dropping log.`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class provides a wrapper around the database for
|
||||
* saving the last timestamp of a container
|
||||
* Streams logs from journalctl and calls container hooks when a record is received matching container id
|
||||
*/
|
||||
export class LogMonitor {
|
||||
private timestamps: { [containerId: string]: number } = {};
|
||||
private writeRequired: { [containerId: string]: boolean } = {};
|
||||
class LogMonitor {
|
||||
private containers: {
|
||||
[containerId: string]: {
|
||||
hook: MonitorHook;
|
||||
follow: boolean;
|
||||
timestamp: number;
|
||||
writeRequired: boolean;
|
||||
};
|
||||
} = {};
|
||||
|
||||
public constructor() {
|
||||
setInterval(() => this.flushDb(), DB_FLUSH_INTERVAL);
|
||||
}
|
||||
|
||||
public updateContainerSentTimestamp(
|
||||
public start() {
|
||||
this.streamLogsFromJournal(
|
||||
{
|
||||
all: true,
|
||||
follow: true,
|
||||
format: 'json',
|
||||
filterString: '_SYSTEMD_UNIT=balena.service',
|
||||
},
|
||||
(row: JournalRow) => {
|
||||
if (row.CONTAINER_ID_FULL && this.containers[row.CONTAINER_ID_FULL]) {
|
||||
this.handleRow(row);
|
||||
}
|
||||
},
|
||||
async (data: Buffer) => {
|
||||
log.error(
|
||||
'Non-empty stderr stream from journalctl log fetching: ',
|
||||
data.toString(),
|
||||
);
|
||||
await delay(JOURNALCTL_ERROR_RETRY_DELAY);
|
||||
this.start();
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
public isAttached(containerId: string): boolean {
|
||||
return containerId in this.containers;
|
||||
}
|
||||
|
||||
public async attach(containerId: string, hook: MonitorHook) {
|
||||
if (!this.containers[containerId]) {
|
||||
this.containers[containerId] = {
|
||||
hook,
|
||||
follow: false,
|
||||
timestamp: Date.now(),
|
||||
writeRequired: false,
|
||||
};
|
||||
this.containers[
|
||||
containerId
|
||||
].timestamp = await this.getContainerSentTimestamp(containerId);
|
||||
this.backfill(containerId, this.containers[containerId].timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
public async detach(containerId: string) {
|
||||
delete this.containers[containerId];
|
||||
await db.models('containerLogs').delete().where({ containerId });
|
||||
}
|
||||
|
||||
private streamLogsFromJournal(
|
||||
options: Parameters<typeof spawnJournalctl>[0],
|
||||
onRow: (row: JournalRow) => void,
|
||||
onError: (data: Buffer) => void,
|
||||
onClose?: () => void,
|
||||
): ReturnType<typeof spawnJournalctl> {
|
||||
const journalctl = spawnJournalctl(options);
|
||||
journalctl.stdout?.pipe(JSONstream.parse(true).on('data', onRow));
|
||||
journalctl.stderr?.on('data', onError);
|
||||
if (onClose) {
|
||||
journalctl.on('close', onClose);
|
||||
}
|
||||
return journalctl;
|
||||
}
|
||||
|
||||
/**
|
||||
* stream logs from lastSentTimestamp until now so logs are not missed if the container started before supervisor
|
||||
*/
|
||||
private backfill(containerId: string, lastSentTimestamp: number) {
|
||||
this.streamLogsFromJournal(
|
||||
{
|
||||
all: true,
|
||||
follow: false,
|
||||
format: 'json',
|
||||
filterString: `CONTAINER_ID_FULL=${containerId}`,
|
||||
since: lastSentTimestamp + 1, // increment to exclude last sent log
|
||||
},
|
||||
(row: JournalRow) => this.handleRow(row),
|
||||
async (data: Buffer) => {
|
||||
log.error('journalctl backfill error: ', data.toString());
|
||||
},
|
||||
() => {
|
||||
this.containers[containerId].follow = true;
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private handleRow(row: JournalRow) {
|
||||
if (row.CONTAINER_ID_FULL && row.CONTAINER_NAME !== 'resin_supervisor') {
|
||||
const containerId = row.CONTAINER_ID_FULL;
|
||||
const message = messageFieldToString(row.MESSAGE);
|
||||
const isStdErr = row.PRIORITY === '3';
|
||||
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
|
||||
if (message != null && this.containers[containerId]) {
|
||||
this.updateContainerSentTimestamp(containerId, timestamp);
|
||||
this.containers[containerId].hook({ message, isStdErr, timestamp });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private updateContainerSentTimestamp(
|
||||
containerId: string,
|
||||
timestamp: number,
|
||||
): void {
|
||||
this.timestamps[containerId] = timestamp;
|
||||
this.writeRequired[containerId] = true;
|
||||
this.containers[containerId].timestamp = timestamp;
|
||||
this.containers[containerId].writeRequired = true;
|
||||
}
|
||||
|
||||
public async getContainerSentTimestamp(containerId: string): Promise<number> {
|
||||
// If this is the first time we are requesting the
|
||||
// timestamp for this container, request it from the db
|
||||
if (this.timestamps[containerId] == null) {
|
||||
// Set the timestamp to 0 before interacting with the
|
||||
// db to avoid multiple db actions at once
|
||||
this.timestamps[containerId] = 0;
|
||||
try {
|
||||
const timestampObj = await db
|
||||
.models('containerLogs')
|
||||
.select('lastSentTimestamp')
|
||||
.where({ containerId });
|
||||
private async getContainerSentTimestamp(
|
||||
containerId: string,
|
||||
): Promise<number> {
|
||||
try {
|
||||
const row = await db
|
||||
.models('containerLogs')
|
||||
.select('lastSentTimestamp')
|
||||
.where({ containerId })
|
||||
.first();
|
||||
|
||||
if (timestampObj == null || _.isEmpty(timestampObj)) {
|
||||
// Create a row in the db so there's something to
|
||||
// update
|
||||
await db
|
||||
.models('containerLogs')
|
||||
.insert({ containerId, lastSentTimestamp: 0 });
|
||||
} else {
|
||||
this.timestamps[containerId] = timestampObj[0].lastSentTimestamp;
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(
|
||||
'There was an error retrieving the container log timestamps:',
|
||||
e,
|
||||
);
|
||||
if (!row) {
|
||||
const now = Date.now();
|
||||
await db
|
||||
.models('containerLogs')
|
||||
.insert({ containerId, lastSentTimestamp: now });
|
||||
return now;
|
||||
} else {
|
||||
return row.lastSentTimestamp;
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(
|
||||
'There was an error retrieving the container log timestamps:',
|
||||
e,
|
||||
);
|
||||
return Date.now();
|
||||
}
|
||||
return this.timestamps[containerId] || 0;
|
||||
}
|
||||
|
||||
private async flushDb() {
|
||||
log.debug('Attempting container log timestamp flush...');
|
||||
const containerIds = Object.getOwnPropertyNames(this.timestamps);
|
||||
try {
|
||||
for (const containerId of containerIds) {
|
||||
for (const containerId of Object.keys(this.containers)) {
|
||||
// Avoid writing to the db if we don't need to
|
||||
if (!this.writeRequired[containerId]) {
|
||||
if (!this.containers[containerId].writeRequired) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await db
|
||||
.models('containerLogs')
|
||||
.where({ containerId })
|
||||
.update({ lastSentTimestamp: this.timestamps[containerId] });
|
||||
this.writeRequired[containerId] = false;
|
||||
await db.models('containerLogs').where({ containerId }).update({
|
||||
lastSentTimestamp: this.containers[containerId].timestamp,
|
||||
});
|
||||
this.containers[containerId].writeRequired = false;
|
||||
}
|
||||
} catch (e) {
|
||||
log.error('There was an error storing the container log timestamps:', e);
|
||||
@ -82,4 +214,6 @@ export class LogMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
export default LogMonitor;
|
||||
const logMonitor = new LogMonitor();
|
||||
|
||||
export default logMonitor;
|
||||
|
@ -14,6 +14,7 @@ import version = require('./lib/supervisor-version');
|
||||
|
||||
import * as avahi from './lib/avahi';
|
||||
import * as firewall from './lib/firewall';
|
||||
import logMonitor from './logging/monitor';
|
||||
|
||||
const startupConfigFields: config.ConfigKey[] = [
|
||||
'uuid',
|
||||
@ -75,6 +76,8 @@ export class Supervisor {
|
||||
deviceState.on('shutdown', () => this.api.stop());
|
||||
|
||||
await apiBinder.start();
|
||||
|
||||
logMonitor.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user