Merge pull request #2351 from balena-os/knex-error

Logs processing improvements
This commit is contained in:
flowzone-app[bot] 2024-07-30 18:47:39 +00:00 committed by GitHub
commit dac9a24c10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 228 additions and 305 deletions

View File

@ -11,7 +11,6 @@ import LocalModeManager from '../local-mode';
import * as dbFormat from '../device-state/db-format';
import { validateTargetContracts } from '../lib/contracts';
import * as constants from '../lib/constants';
import { docker } from '../lib/docker-utils';
import log from '../lib/supervisor-console';
import {
ContractViolationError,
@ -90,20 +89,6 @@ export const initialized = _.once(async () => {
await config.initialized();
await imageManager.cleanImageData();
const cleanup = async () => {
const containers = await docker.listContainers({ all: true });
await logger.clearOutOfDateDBLogs(_.map(containers, 'Id'));
};
// Rather than relying on removing out of date database entries when we're no
// longer using them, set a task that runs periodically to clear out the database
// This has the advantage that if for some reason a container is removed while the
// supervisor is down, we won't have zombie entries in the db
// Once a day
setInterval(cleanup, 1000 * 60 * 60 * 24);
// But also run it in on startup
await cleanup();
await localModeManager.init();
await serviceManager.attachToRunning();

View File

@ -353,14 +353,13 @@ export async function start(service: Service) {
}
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.start`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.start`,
);
}
void logger.attach(container.id, { serviceId, imageId });
void logger.attach(container.id, { serviceId });
if (!alreadyStarted) {
logger.logSystemEvent(LogTypes.startServiceSuccess, { service });
@ -416,15 +415,13 @@ export function listenToEvents() {
});
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.listenToEvents`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.listenToEvents`,
);
}
void logger.attach(data.id, {
serviceId,
imageId,
});
} else if (status === 'destroy') {
await logMonitor.detach(data.id);
@ -468,10 +465,9 @@ export async function attachToRunning() {
for (const service of services) {
if (service.status === 'Running') {
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.start`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.start`,
);
}
@ -482,7 +478,6 @@ export async function attachToRunning() {
}
void logger.attach(service.containerId, {
serviceId,
imageId,
});
}
}

View File

@ -14,7 +14,7 @@ import log from './supervisor-console';
export const toJournalDate = (timestamp: number): string =>
new Date(timestamp).toISOString().replace(/T/, ' ').replace(/\..+$/, '');
export function spawnJournalctl(opts: {
export interface SpawnJournalctlOpts {
all: boolean;
follow: boolean;
count?: number | 'all';
@ -24,7 +24,9 @@ export function spawnJournalctl(opts: {
filterString?: string;
since?: string;
until?: string;
}): ChildProcess {
}
export function spawnJournalctl(opts: SpawnJournalctlOpts): ChildProcess {
const args: string[] = [];
if (opts.all) {
args.push('-a');

View File

@ -2,13 +2,11 @@ import Bluebird from 'bluebird';
import _ from 'lodash';
import * as config from './config';
import * as db from './db';
import * as eventTracker from './event-tracker';
import type { LogType } from './lib/log-types';
import { takeGlobalLockRW } from './lib/process-lock';
import type { LogBackend, LogMessage } from './logging';
import { BalenaLogBackend, LocalLogBackend } from './logging';
import type { MonitorHook } from './logging/monitor';
import logMonitor from './logging/monitor';
import * as globalEventBus from './event-bus';
@ -105,8 +103,8 @@ export function enable(value: boolean = true) {
}
}
export function log(message: LogMessage) {
backend?.log(message);
export async function log(message: LogMessage) {
await backend?.log(message);
}
export function logSystemMessage(
@ -115,11 +113,13 @@ export function logSystemMessage(
eventName?: string,
track: boolean = true,
) {
const msgObj: LogMessage = { message, isSystem: true };
const msgObj: LogMessage = { message, isSystem: true, timestamp: Date.now() };
if (eventObj != null && eventObj.error != null) {
msgObj.isStdErr = true;
}
log(msgObj);
// IMPORTANT: this could potentially create a memory leak if logSystemMessage
// is used too quickly but we don't want supervisor logging to hold up other tasks
void log(msgObj);
if (track) {
eventTracker.track(
eventName != null ? eventName : message,
@ -134,26 +134,21 @@ export function lock(containerId: string): Bluebird.Disposer<() => void> {
});
}
type ServiceInfo = { serviceId: number; imageId: number };
export function attach(
type ServiceInfo = { serviceId: number };
export async function attach(
containerId: string,
{ serviceId, imageId }: ServiceInfo,
): Bluebird<void> {
{ serviceId }: ServiceInfo,
): Promise<void> {
// First detect if we already have an attached log stream
// for this container
if (logMonitor.isAttached(containerId)) {
return Bluebird.resolve();
return;
}
return Bluebird.using(lock(containerId), async () => {
await logMonitor.attach(
containerId,
(message: Parameters<MonitorHook>[0] & Partial<ServiceInfo>) => {
message.serviceId = serviceId;
message.imageId = imageId;
log(message);
},
);
await logMonitor.attach(containerId, async (message) => {
await log({ ...message, serviceId });
});
});
}
@ -201,16 +196,6 @@ export function logConfigChange(
logSystemMessage(message, obj, eventName);
}
export async function clearOutOfDateDBLogs(containerIds: string[]) {
superConsole.debug(
'Performing database cleanup for container log timestamps',
);
await db
.models('containerLogs')
.whereNotIn('containerId', containerIds)
.delete();
}
function objectNameForLogs(eventObj: LogEventObject): string | null {
if (eventObj == null) {
return null;

View File

@ -4,6 +4,7 @@ import _ from 'lodash';
import stream from 'stream';
import url from 'url';
import zlib from 'zlib';
import { setTimeout } from 'timers/promises';
import type { LogMessage } from './log-backend';
import { LogBackend } from './log-backend';
@ -15,7 +16,6 @@ const MIN_COOLDOWN_PERIOD = 5 * 1000; // 5 seconds
const MAX_COOLDOWN_PERIOD = 300 * 1000; // 5 minutes
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;
const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;
@ -31,7 +31,6 @@ export class BalenaLogBackend extends LogBackend {
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
private timeout: NodeJS.Timeout;
public initialised = false;
@ -61,7 +60,7 @@ export class BalenaLogBackend extends LogBackend {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
this.tryWrite({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
@ -76,15 +75,14 @@ export class BalenaLogBackend extends LogBackend {
return this.initialised;
}
public log(message: LogMessage) {
public async log(message: LogMessage) {
// TODO: Perhaps don't just drop logs when we haven't
// yet initialised (this happens when a device has not yet
// been provisioned)
// TODO: the backend should not be aware of unmanaged or publish state
if (this.unmanaged || !this.publishEnabled || !this.initialised) {
return;
}
if (!_.isObject(message)) {
// Yield control to the event loop
await setTimeout(0);
return;
}
@ -92,15 +90,12 @@ export class BalenaLogBackend extends LogBackend {
return;
}
message.timestamp ??= Date.now();
message.message = message.message
? _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
})
: '';
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
await this.write(message);
}
public assignFields(endpoint: string, uuid: string, deviceApiKey: string) {
@ -117,14 +112,9 @@ export class BalenaLogBackend extends LogBackend {
private lastSetupAttempt = 0;
private setupFailures = 0;
private setupPending = false;
private setup() {
if (this.setupPending || this.req != null) {
// If we already have a setup pending, or we are already setup, then do nothing
return;
}
this.setupPending = true;
private setupPromise: Promise<void> | null = null;
private async trySetup() {
// Work out the total delay we need
const totalDelay = Math.min(
2 ** this.setupFailures * MIN_COOLDOWN_PERIOD,
@ -135,62 +125,83 @@ export class BalenaLogBackend extends LogBackend {
// The difference between the two is the actual delay we want
const delay = Math.max(totalDelay - alreadyDelayedBy, 0);
setTimeout(() => {
this.setupPending = false;
this.lastSetupAttempt = Date.now();
await setTimeout(delay);
const setupFailed = () => {
this.setupFailures++;
this.teardown();
};
this.lastSetupAttempt = Date.now();
this.req = https.request(this.opts);
const setupFailed = () => {
this.setupFailures++;
this.teardown();
};
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
log.error(
'LogBackend: server responded with status code:',
res.statusCode,
);
setupFailed();
});
this.req = https.request(this.opts);
this.req.on('timeout', setupFailed);
this.req.on('close', setupFailed);
this.req.on('error', (err) => {
log.error('LogBackend: unexpected error:', err);
setupFailed();
});
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
log.error(
'LogBackend: server responded with status code:',
res.statusCode,
);
setupFailed();
});
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
this.req.on('timeout', setupFailed);
this.req.on('close', setupFailed);
this.req.on('error', (err) => {
log.error('LogBackend: unexpected error:', err);
setupFailed();
});
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', setupFailed);
this.gzip.pipe(this.req);
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentially lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.setupFailures = 0;
this.stream.pipe(this.gzip);
setImmediate(this.flush);
}
}, RESPONSE_GRACE_PERIOD);
}, delay);
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', setupFailed);
this.gzip.pipe(this.req);
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentially lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
await setTimeout(RESPONSE_GRACE_PERIOD);
// a teardown could happen while we wait for the grace period so we check
// that gzip is still valid
if (this.gzip != null) {
this.setupFailures = 0;
this.stream.pipe(this.gzip);
setImmediate(this.flush);
}
}
private async setup() {
if (this.req != null) {
// If we are already setup, then do nothing
return;
}
// If the setup is in progress, let callers wait for the existing promise
if (this.setupPromise != null) {
return this.setupPromise;
}
// Store the setup promise in case there are concurrent calls to
// the setup
this.setupPromise = this.trySetup().finally(() => {
this.setupPromise = null;
});
return this.setupPromise;
}
private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
@ -210,30 +221,43 @@ export class BalenaLogBackend extends LogBackend {
private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
this.req.on('error', () => {
/* noop */
});
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
this.gzip = null;
}
this.req = null;
}
}
private write(message: LogMessage) {
private tryWrite(message: LogMessage) {
try {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} catch (e) {
log.error('Failed to write to logging stream, dropping message.', e);
}
}
private async write(message: LogMessage) {
this.snooze();
this.setup();
// Setup could terminate unsuccessfully, at which point
// the messages will get added to the stream until it fills
await this.setup();
if (this.writable) {
try {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} catch (e) {
log.error('Failed to write to logging stream, dropping message.', e);
}
this.tryWrite(message);
} else {
this.dropCount += 1;
// Yield execution to the event loop to avoid
// an aggressive logger to overwhelm the process
await setTimeout(0);
}
}
}

View File

@ -1,17 +1,15 @@
type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp?: number;
timestamp: number;
};
export type LogMessage = BaseLogMessage &
(
| {
serviceId?: number;
imageId?: number;
serviceId: number;
isSystem?: false;
}
| {
message: string;
isSystem: true;
}
);
@ -20,7 +18,7 @@ export abstract class LogBackend {
public unmanaged: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
public abstract log(message: LogMessage): Promise<void>;
}
export default LogBackend;

View File

@ -1,6 +1,5 @@
import JSONstream from 'JSONStream';
import { pipeline } from 'stream/promises';
import * as db from '../db';
import { spawnJournalctl, toJournalDate } from '../lib/journald';
import log from '../lib/supervisor-console';
import { setTimeout } from 'timers/promises';
@ -21,9 +20,6 @@ interface JournalRow {
__REALTIME_TIMESTAMP: string;
}
// Flush every 10 mins
const DB_FLUSH_INTERVAL = 10 * 60 * 1000;
// Wait 5s when journalctl failed before trying to read the logs again
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
const JOURNALCTL_ERROR_RETRY_DELAY_MAX = 15 * 60 * 1000;
@ -41,6 +37,20 @@ function messageFieldToString(entry: JournalRow['MESSAGE']): string | null {
}
}
async function* splitStream(chunkIterable: AsyncIterable<any>) {
let previous = '';
for await (const chunk of chunkIterable) {
previous += chunk;
const lines = previous.split(/\r?\n/);
previous = lines.pop() ?? '';
yield* lines;
}
if (previous.length > 0) {
yield previous;
}
}
/**
* Streams logs from journalctl and calls container hooks when a record is received matching container id
*/
@ -48,53 +58,67 @@ class LogMonitor {
private containers: {
[containerId: string]: {
hook: MonitorHook;
follow: boolean;
timestamp: number;
writeRequired: boolean;
};
} = {};
private setupAttempts = 0;
public constructor() {
setInterval(() => this.flushDb(), DB_FLUSH_INTERVAL);
}
// Only stream logs since the start of the supervisor
private lastSentTimestamp = Date.now() - performance.now();
public start() {
this.streamLogsFromJournal(
{
public async start(): Promise<void> {
try {
// TODO: do not spawn journalctl if logging is not enabled
const { stdout, stderr } = spawnJournalctl({
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
},
(row) => {
if (row.CONTAINER_ID_FULL && this.containers[row.CONTAINER_ID_FULL]) {
this.setupAttempts = 0;
this.handleRow(row);
since: toJournalDate(this.lastSentTimestamp),
});
if (!stdout) {
// this will be catched below
throw new Error('failed to open process stream');
}
stderr?.on('data', (data) =>
log.error('journalctl - balena.service stderr: ', data.toString()),
);
const self = this;
await pipeline(stdout, splitStream, async function (lines) {
self.setupAttempts = 0;
for await (const line of lines) {
try {
const row = JSON.parse(line);
if (
row.CONTAINER_ID_FULL &&
self.containers[row.CONTAINER_ID_FULL]
) {
await self.handleRow(row);
}
} catch {
// ignore parsing errors
}
}
},
(data) => {
log.error('journalctl - balena.service stderr: ', data.toString());
},
() => {
// noop for closed
},
async () => {
log.debug('balena.service journalctl process exit.');
// On exit of process try to create another
const wait = Math.min(
2 ** this.setupAttempts++ * JOURNALCTL_ERROR_RETRY_DELAY,
JOURNALCTL_ERROR_RETRY_DELAY_MAX,
);
log.debug(
`Spawning another process to watch balena.service logs in ${
wait / 1000
}s`,
);
await setTimeout(wait);
return this.start();
},
});
log.debug('balena.service journalctl process exit.');
} catch (e: any) {
log.error('journalctl - balena.service error: ', e.message ?? e);
}
// On exit of process try to create another
const wait = Math.min(
2 ** this.setupAttempts++ * JOURNALCTL_ERROR_RETRY_DELAY,
JOURNALCTL_ERROR_RETRY_DELAY_MAX,
);
log.debug(
`Spawning another process to watch balena.service logs in ${
wait / 1000
}s`,
);
await setTimeout(wait);
return this.start();
}
public isAttached(containerId: string): boolean {
@ -105,66 +129,15 @@ class LogMonitor {
if (!this.containers[containerId]) {
this.containers[containerId] = {
hook,
follow: false,
timestamp: Date.now(),
writeRequired: false,
};
this.containers[containerId].timestamp =
await this.getContainerSentTimestamp(containerId);
this.backfill(containerId, this.containers[containerId].timestamp);
}
}
public async detach(containerId: string) {
delete this.containers[containerId];
await db.models('containerLogs').delete().where({ containerId });
}
private streamLogsFromJournal(
options: Parameters<typeof spawnJournalctl>[0],
onRow: (row: JournalRow) => void,
onError: (data: Buffer) => void,
onClose?: () => void,
onExit?: () => void,
): ReturnType<typeof spawnJournalctl> {
const journalctl = spawnJournalctl(options);
journalctl.stdout?.pipe(JSONstream.parse(true).on('data', onRow));
journalctl.stderr?.on('data', onError);
if (onClose) {
journalctl.on('close', onClose);
}
if (onExit) {
journalctl.on('exit', onExit);
}
return journalctl;
}
/**
* stream logs from lastSentTimestamp until now so logs are not missed if the container started before supervisor
*/
private backfill(containerId: string, lastSentTimestamp: number) {
this.streamLogsFromJournal(
{
all: true,
follow: false,
format: 'json',
filterString: `CONTAINER_ID_FULL=${containerId}`,
since: toJournalDate(lastSentTimestamp + 1), // increment to exclude last sent log
},
(row) => this.handleRow(row),
(data) => {
log.error(
`journalctl - container ${containerId} stderr: `,
data.toString(),
);
},
() => {
this.containers[containerId].follow = true;
},
);
}
private handleRow(row: JournalRow) {
private async handleRow(row: JournalRow) {
if (
row.CONTAINER_ID_FULL == null ||
row.CONTAINER_NAME === 'balena_supervisor' ||
@ -182,66 +155,9 @@ class LogMonitor {
}
const isStdErr = row.PRIORITY === '3';
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
this.updateContainerSentTimestamp(containerId, timestamp);
// WARNING: this could lead to a memory leak as the hook is not being awaited
// and the journal can be very verbose
void this.containers[containerId].hook({ message, isStdErr, timestamp });
}
private updateContainerSentTimestamp(
containerId: string,
timestamp: number,
): void {
this.containers[containerId].timestamp = timestamp;
this.containers[containerId].writeRequired = true;
}
private async getContainerSentTimestamp(
containerId: string,
): Promise<number> {
try {
const row = await db
.models('containerLogs')
.select('lastSentTimestamp')
.where({ containerId })
.first();
if (!row) {
const now = Date.now();
await db
.models('containerLogs')
.insert({ containerId, lastSentTimestamp: now });
return now;
} else {
return row.lastSentTimestamp;
}
} catch (e) {
log.error(
'There was an error retrieving the container log timestamps:',
e,
);
return Date.now();
}
}
private async flushDb() {
log.debug('Attempting container log timestamp flush...');
try {
for (const containerId of Object.keys(this.containers)) {
// Avoid writing to the db if we don't need to
if (!this.containers[containerId].writeRequired) {
continue;
}
await db.models('containerLogs').where({ containerId }).update({
lastSentTimestamp: this.containers[containerId].timestamp,
});
this.containers[containerId].writeRequired = false;
}
} catch (e) {
log.error('There was an error storing the container log timestamps:', e);
}
log.debug('Container log timestamp flush complete');
await this.containers[containerId].hook({ message, isStdErr, timestamp });
this.lastSentTimestamp = timestamp;
}
}

10
src/migrations/M00011.js Normal file
View File

@ -0,0 +1,10 @@
export async function up(knex) {
// Drop the container logs table
if (await knex.schema.hasTable('containerLogs')) {
await knex.schema.dropTable('containerLogs');
}
}
export function down() {
throw new Error('Not implemented');
}

View File

@ -84,7 +84,7 @@ export class Supervisor {
apiBinder.start(),
]);
logMonitor.start();
await logMonitor.start();
}
}

View File

@ -52,8 +52,12 @@ describe('Logger', function () {
it('waits the grace period before sending any logs', async function () {
const clock = sinon.useFakeTimers();
logger.log({ message: 'foobar', serviceId: 15 });
clock.tick(4999);
await logger.log({
message: 'foobar',
serviceId: 15,
timestamp: Date.now(),
});
await clock.tickAsync(4999);
clock.restore();
await setTimeout(100);
@ -62,8 +66,12 @@ describe('Logger', function () {
it('tears down the connection after inactivity', async function () {
const clock = sinon.useFakeTimers();
logger.log({ message: 'foobar', serviceId: 15 });
clock.tick(61000);
await logger.log({
message: 'foobar',
serviceId: 15,
timestamp: Date.now(),
});
await clock.tickAsync(61000);
clock.restore();
await setTimeout(100);
@ -72,9 +80,9 @@ describe('Logger', function () {
it('sends logs as gzipped ndjson', async function () {
const timestamp = Date.now();
logger.log({ message: 'foobar', serviceId: 15 });
logger.log({ timestamp: 1337, message: 'foobar', serviceId: 15 });
logger.log({ message: 'foobar' }); // shold be ignored
await logger.log({ message: 'foobar', serviceId: 15, timestamp: 1000 });
await logger.log({ timestamp: 1337, message: 'foobar', serviceId: 15 });
await logger.log({ message: 'foobar', isSystem: true, timestamp: 1500 }); // shold be ignored
await setTimeout(5500);
expect(this.requestStub.calledOnce).to.be.true;