Do not use DB to store container logs info

This removes the dependence of the supervisor on the containerLogs
database for remembering the last sent timestamp. This commit instead
uses the supervisor startup time as the initial time for log retrieval.
This might result in some logs missing for services that may start
before the supervisor after a boot, or if the supervisor restarts.
However this seems like an acceptable trade-off as the current
implementation seems to make things worst in resource contrained
environments.

We'll move storing the last sent timestamp to a better storage medium in
a future commit.

Change-type: minor
This commit is contained in:
Felipe Lalanne 2024-07-10 15:19:48 -04:00
parent 4a8bf14196
commit dbacca977a
No known key found for this signature in database
GPG Key ID: 03E696BFD472B26A
5 changed files with 27 additions and 151 deletions

View File

@ -11,7 +11,6 @@ import LocalModeManager from '../local-mode';
import * as dbFormat from '../device-state/db-format';
import { validateTargetContracts } from '../lib/contracts';
import * as constants from '../lib/constants';
import { docker } from '../lib/docker-utils';
import log from '../lib/supervisor-console';
import {
ContractViolationError,
@ -90,20 +89,6 @@ export const initialized = _.once(async () => {
await config.initialized();
await imageManager.cleanImageData();
const cleanup = async () => {
const containers = await docker.listContainers({ all: true });
await logger.clearOutOfDateDBLogs(_.map(containers, 'Id'));
};
// Rather than relying on removing out of date database entries when we're no
// longer using them, set a task that runs periodically to clear out the database
// This has the advantage that if for some reason a container is removed while the
// supervisor is down, we won't have zombie entries in the db
// Once a day
setInterval(cleanup, 1000 * 60 * 60 * 24);
// But also run it in on startup
await cleanup();
await localModeManager.init();
await serviceManager.attachToRunning();

View File

@ -14,7 +14,7 @@ import log from './supervisor-console';
export const toJournalDate = (timestamp: number): string =>
new Date(timestamp).toISOString().replace(/T/, ' ').replace(/\..+$/, '');
export function spawnJournalctl(opts: {
export interface SpawnJournalctlOpts {
all: boolean;
follow: boolean;
count?: number | 'all';
@ -24,7 +24,9 @@ export function spawnJournalctl(opts: {
filterString?: string;
since?: string;
until?: string;
}): ChildProcess {
}
export function spawnJournalctl(opts: SpawnJournalctlOpts): ChildProcess {
const args: string[] = [];
if (opts.all) {
args.push('-a');

View File

@ -2,13 +2,11 @@ import Bluebird from 'bluebird';
import _ from 'lodash';
import * as config from './config';
import * as db from './db';
import * as eventTracker from './event-tracker';
import type { LogType } from './lib/log-types';
import { takeGlobalLockRW } from './lib/process-lock';
import type { LogBackend, LogMessage } from './logging';
import { BalenaLogBackend, LocalLogBackend } from './logging';
import type { MonitorHook } from './logging/monitor';
import logMonitor from './logging/monitor';
import * as globalEventBus from './event-bus';
@ -146,14 +144,9 @@ export function attach(
}
return Bluebird.using(lock(containerId), async () => {
await logMonitor.attach(
containerId,
(message: Parameters<MonitorHook>[0] & Partial<ServiceInfo>) => {
message.serviceId = serviceId;
message.imageId = imageId;
log(message);
},
);
await logMonitor.attach(containerId, (message) => {
log({ ...message, serviceId, imageId });
});
});
}
@ -201,16 +194,6 @@ export function logConfigChange(
logSystemMessage(message, obj, eventName);
}
export async function clearOutOfDateDBLogs(containerIds: string[]) {
superConsole.debug(
'Performing database cleanup for container log timestamps',
);
await db
.models('containerLogs')
.whereNotIn('containerId', containerIds)
.delete();
}
function objectNameForLogs(eventObj: LogEventObject): string | null {
if (eventObj == null) {
return null;

View File

@ -1,6 +1,6 @@
import JSONstream from 'JSONStream';
import * as db from '../db';
import type { SpawnJournalctlOpts } from '../lib/journald';
import { spawnJournalctl, toJournalDate } from '../lib/journald';
import log from '../lib/supervisor-console';
import { setTimeout } from 'timers/promises';
@ -9,7 +9,7 @@ export type MonitorHook = (message: {
message: string;
isStdErr: boolean;
timestamp: number;
}) => Resolvable<void>;
}) => void;
// This is nowhere near the amount of fields provided by journald, but simply the ones
// that we are interested in
@ -21,9 +21,6 @@ interface JournalRow {
__REALTIME_TIMESTAMP: string;
}
// Flush every 10 mins
const DB_FLUSH_INTERVAL = 10 * 60 * 1000;
// Wait 5s when journalctl failed before trying to read the logs again
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
const JOURNALCTL_ERROR_RETRY_DELAY_MAX = 15 * 60 * 1000;
@ -48,16 +45,12 @@ class LogMonitor {
private containers: {
[containerId: string]: {
hook: MonitorHook;
follow: boolean;
timestamp: number;
writeRequired: boolean;
};
} = {};
private setupAttempts = 0;
public constructor() {
setInterval(() => this.flushDb(), DB_FLUSH_INTERVAL);
}
// Only stream logs since the start of the supervisor
private lastSentTimestamp = Date.now() - performance.now();
public start() {
this.streamLogsFromJournal(
@ -66,6 +59,7 @@ class LogMonitor {
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
since: toJournalDate(this.lastSentTimestamp),
},
(row) => {
if (row.CONTAINER_ID_FULL && this.containers[row.CONTAINER_ID_FULL]) {
@ -76,9 +70,6 @@ class LogMonitor {
(data) => {
log.error('journalctl - balena.service stderr: ', data.toString());
},
() => {
// noop for closed
},
async () => {
log.debug('balena.service journalctl process exit.');
// On exit of process try to create another
@ -105,65 +96,27 @@ class LogMonitor {
if (!this.containers[containerId]) {
this.containers[containerId] = {
hook,
follow: false,
timestamp: Date.now(),
writeRequired: false,
};
this.containers[containerId].timestamp =
await this.getContainerSentTimestamp(containerId);
this.backfill(containerId, this.containers[containerId].timestamp);
}
}
public async detach(containerId: string) {
delete this.containers[containerId];
await db.models('containerLogs').delete().where({ containerId });
}
private streamLogsFromJournal(
options: Parameters<typeof spawnJournalctl>[0],
options: SpawnJournalctlOpts,
onRow: (row: JournalRow) => void,
onError: (data: Buffer) => void,
onClose?: () => void,
onExit?: () => void,
onExit: () => void,
): ReturnType<typeof spawnJournalctl> {
const journalctl = spawnJournalctl(options);
journalctl.stdout?.pipe(JSONstream.parse(true).on('data', onRow));
journalctl.stderr?.on('data', onError);
if (onClose) {
journalctl.on('close', onClose);
}
if (onExit) {
journalctl.on('exit', onExit);
}
journalctl.on('exit', onExit);
return journalctl;
}
/**
* stream logs from lastSentTimestamp until now so logs are not missed if the container started before supervisor
*/
private backfill(containerId: string, lastSentTimestamp: number) {
this.streamLogsFromJournal(
{
all: true,
follow: false,
format: 'json',
filterString: `CONTAINER_ID_FULL=${containerId}`,
since: toJournalDate(lastSentTimestamp + 1), // increment to exclude last sent log
},
(row) => this.handleRow(row),
(data) => {
log.error(
`journalctl - container ${containerId} stderr: `,
data.toString(),
);
},
() => {
this.containers[containerId].follow = true;
},
);
}
private handleRow(row: JournalRow) {
if (
row.CONTAINER_ID_FULL == null ||
@ -182,66 +135,9 @@ class LogMonitor {
}
const isStdErr = row.PRIORITY === '3';
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
this.updateContainerSentTimestamp(containerId, timestamp);
// WARNING: this could lead to a memory leak as the hook is not being awaited
// and the journal can be very verbose
void this.containers[containerId].hook({ message, isStdErr, timestamp });
}
private updateContainerSentTimestamp(
containerId: string,
timestamp: number,
): void {
this.containers[containerId].timestamp = timestamp;
this.containers[containerId].writeRequired = true;
}
private async getContainerSentTimestamp(
containerId: string,
): Promise<number> {
try {
const row = await db
.models('containerLogs')
.select('lastSentTimestamp')
.where({ containerId })
.first();
if (!row) {
const now = Date.now();
await db
.models('containerLogs')
.insert({ containerId, lastSentTimestamp: now });
return now;
} else {
return row.lastSentTimestamp;
}
} catch (e) {
log.error(
'There was an error retrieving the container log timestamps:',
e,
);
return Date.now();
}
}
private async flushDb() {
log.debug('Attempting container log timestamp flush...');
try {
for (const containerId of Object.keys(this.containers)) {
// Avoid writing to the db if we don't need to
if (!this.containers[containerId].writeRequired) {
continue;
}
await db.models('containerLogs').where({ containerId }).update({
lastSentTimestamp: this.containers[containerId].timestamp,
});
this.containers[containerId].writeRequired = false;
}
} catch (e) {
log.error('There was an error storing the container log timestamps:', e);
}
log.debug('Container log timestamp flush complete');
this.containers[containerId].hook({ message, isStdErr, timestamp });
this.lastSentTimestamp = timestamp;
}
}

10
src/migrations/M00011.js Normal file
View File

@ -0,0 +1,10 @@
export async function up(knex) {
// Drop the container logs table
if (await knex.schema.hasTable('containerLogs')) {
await knex.schema.dropTable('containerLogs');
}
}
export function down() {
throw new Error('Not implemented');
}