Merge pull request #2382 from balena-os/support-streaming-host-logs

Stream host logs from os-power-mode.service and os-fan-profile.service
This commit is contained in:
flowzone-app[bot] 2024-12-06 17:52:06 +00:00 committed by GitHub
commit 28a27fc96d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 399 additions and 298 deletions

View File

@ -20,7 +20,7 @@ import log from '../lib/supervisor-console';
import * as deviceState from '../device-state';
import * as globalEventBus from '../event-bus';
import * as TargetState from './poll';
import * as logger from '../logger';
import * as logger from '../logging';
import * as apiHelper from '../lib/api-helper';
import { startReporting, stateReportErrors } from './report';

View File

@ -4,7 +4,7 @@ import type StrictEventEmitter from 'strict-event-emitter-types';
import * as config from '../config';
import type { Transaction } from '../db';
import * as logger from '../logger';
import * as logger from '../logging';
import LocalModeManager from '../local-mode';
import * as dbFormat from '../device-state/db-format';

View File

@ -15,7 +15,7 @@ import {
StatusError,
} from '../lib/errors';
import * as LogTypes from '../lib/log-types';
import * as logger from '../logger';
import * as logger from '../logging';
import { ImageDownloadBackoffError } from './errors';
import type { Service } from './service';

View File

@ -6,7 +6,7 @@ import { isNotFoundError } from '../lib/errors';
import logTypes = require('../lib/log-types');
import log from '../lib/supervisor-console';
import * as logger from '../logger';
import * as logger from '../logging';
import { Network } from './network';
import { ResourceRecreationAttemptError } from './errors';

View File

@ -3,7 +3,7 @@ import type dockerode from 'dockerode';
import { docker } from '../lib/docker-utils';
import logTypes = require('../lib/log-types');
import * as logger from '../logger';
import * as logger from '../logging';
import log from '../lib/supervisor-console';
import * as ComposeUtils from './utils';

View File

@ -8,7 +8,7 @@ import type StrictEventEmitter from 'strict-event-emitter-types';
import * as config from '../config';
import { docker } from '../lib/docker-utils';
import * as logger from '../logger';
import * as logger from '../logging';
import { PermissiveNumber } from '../config/types';
import * as constants from '../lib/constants';

View File

@ -8,7 +8,7 @@ import { pathOnData } from '../lib/host-utils';
import { docker } from '../lib/docker-utils';
import * as LogTypes from '../lib/log-types';
import log from '../lib/supervisor-console';
import * as logger from '../logger';
import * as logger from '../logging';
import { ResourceRecreationAttemptError } from './errors';
import type { VolumeConfig } from './types';
import { Volume } from './volume';

View File

@ -7,7 +7,7 @@ import { docker } from '../lib/docker-utils';
import { InternalInconsistencyError } from '../lib/errors';
import * as LogTypes from '../lib/log-types';
import type { LabelObject } from '../types';
import * as logger from '../logger';
import * as logger from '../logging';
import * as ComposeUtils from './utils';
import type {

View File

@ -7,7 +7,7 @@ import { ConfigBackend } from './backend';
import { exec, exists } from '../../lib/fs-utils';
import * as hostUtils from '../../lib/host-utils';
import * as constants from '../../lib/constants';
import * as logger from '../../logger';
import * as logger from '../../logging';
import log from '../../lib/supervisor-console';
/**

View File

@ -4,7 +4,7 @@ import { getGlobalApiKey, refreshKey } from '../lib/api-keys';
import * as messages from './messages';
import * as eventTracker from '../event-tracker';
import * as deviceState from '../device-state';
import * as logger from '../logger';
import * as logger from '../logging';
import * as config from '../config';
import * as hostConfig from '../host-config';
import type {

View File

@ -12,7 +12,7 @@ import * as commitStore from '../compose/commit';
import * as config from '../config';
import * as db from '../db';
import * as deviceConfig from '../device-config';
import * as logger from '../logger';
import * as logger from '../logging';
import * as images from '../compose/images';
import * as volumeManager from '../compose/volume-manager';
import * as serviceManager from '../compose/service-manager';

View File

@ -4,7 +4,7 @@ import { promises as fs } from 'fs';
import * as config from './config';
import * as db from './db';
import * as logger from './logger';
import * as logger from './logging';
import * as dbus from './lib/dbus';
import type { EnvVarObject } from './types';
import { UnitNotLoadedError } from './lib/errors';

View File

@ -6,7 +6,7 @@ import type StrictEventEmitter from 'strict-event-emitter-types';
import prettyMs from 'pretty-ms';
import * as config from '../config';
import * as logger from '../logger';
import * as logger from '../logging';
import * as network from '../network';
import * as deviceConfig from '../device-config';

View File

@ -4,7 +4,7 @@ import * as config from '../config/index';
import * as constants from './constants';
import * as iptables from './iptables';
import { log } from './supervisor-console';
import { logSystemMessage } from '../logger';
import { logSystemMessage } from '../logging';
import * as dbFormat from '../device-state/db-format';

View File

@ -21,7 +21,7 @@ export interface SpawnJournalctlOpts {
unit?: string;
containerId?: string;
format: string;
filterString?: string;
filter?: string | string[];
since?: string;
until?: string;
}
@ -57,8 +57,16 @@ export function spawnJournalctl(opts: SpawnJournalctlOpts): ChildProcess {
args.push('-o');
args.push(opts.format);
if (opts.filterString) {
args.push(opts.filterString);
if (opts.filter != null) {
// A single filter argument without spaces can be passed as a string
if (typeof opts.filter === 'string') {
args.push(opts.filter);
} else {
// Multiple filter arguments need to be passed as an array of strings
// instead of a single string with spaces, as `spawn` will interpret
// the single string as a single argument to journalctl, which is invalid.
args.push(...opts.filter);
}
}
log.debug('Spawning journalctl', args.join(' '));

View File

@ -14,7 +14,7 @@ import { docker } from './docker-utils';
import { log } from './supervisor-console';
import { pathOnData } from './host-utils';
import type { Volume } from '../compose/volume';
import * as logger from '../logger';
import * as logger from '../logging';
import type {
DatabaseApp,
DatabaseService,

View File

@ -9,7 +9,7 @@ import { mkdirp } from './fs-utils';
import * as lockfile from './lockfile';
import { takeGlobalLockRW } from './process-lock';
import * as logTypes from './log-types';
import * as logger from '../logger';
import * as logger from '../logging';
export const LOCKFILE_UID = 65534;
export const BASE_LOCK_DIR = '/tmp/balena-supervisor/services';

View File

@ -6,7 +6,7 @@ import * as constants from './lib/constants';
import { docker } from './lib/docker-utils';
import { SupervisorContainerNotFoundError } from './lib/errors';
import log from './lib/supervisor-console';
import * as logger from './logger';
import * as logger from './logging';
// EngineSnapshot represents a list of containers, images, volumens, and networks present on the engine.
// A snapshot is taken before entering local mode in order to perform cleanup when we exit local mode.

View File

@ -1,229 +0,0 @@
import Bluebird from 'bluebird';
import _ from 'lodash';
import * as config from './config';
import * as eventTracker from './event-tracker';
import type { LogType } from './lib/log-types';
import { takeGlobalLockRW } from './lib/process-lock';
import type { LogBackend, LogMessage } from './logging';
import { BalenaLogBackend, LocalLogBackend } from './logging';
import logMonitor from './logging/monitor';
import * as globalEventBus from './event-bus';
import superConsole from './lib/supervisor-console';
type LogEventObject = Dictionary<any> | null;
// export class Logger {
let backend: LogBackend | null = null;
let balenaBackend: BalenaLogBackend | null = null;
let localBackend: LocalLogBackend | null = null;
export const initialized = _.once(async () => {
await config.initialized();
const {
apiEndpoint,
logsEndpoint,
uuid,
deviceApiKey,
unmanaged,
loggingEnabled,
localMode,
} = await config.getMany([
'apiEndpoint',
'logsEndpoint',
'uuid',
'deviceApiKey',
'unmanaged',
'loggingEnabled',
'localMode',
]);
balenaBackend = new BalenaLogBackend(
logsEndpoint ?? apiEndpoint,
uuid,
deviceApiKey,
);
localBackend = new LocalLogBackend();
backend = localMode ? localBackend : balenaBackend;
backend.unmanaged = unmanaged;
backend.publishEnabled = loggingEnabled;
if (!balenaBackend.isInitialised()) {
globalEventBus.getInstance().once('deviceProvisioned', async () => {
const conf = await config.getMany([
'uuid',
'apiEndpoint',
'logsEndpoint',
'deviceApiKey',
]);
// We use Boolean here, as deviceApiKey when unset
// is '' for legacy reasons. Once we're totally
// typescript, we can make it have a default value
// of undefined.
if (_.every(conf, Boolean)) {
// Everything is set, provide the values to the
// balenaBackend, and remove our listener
balenaBackend!.assignFields(
conf.logsEndpoint ?? conf.apiEndpoint,
conf.uuid!,
conf.deviceApiKey,
);
}
});
}
});
export function switchBackend(localMode: boolean) {
if (localMode) {
// Use the local mode backend
backend = localBackend;
superConsole.info('Switching logging backend to LocalLogBackend');
} else {
// Use the balena backend
backend = balenaBackend;
superConsole.info('Switching logging backend to BalenaLogBackend');
}
}
export function getLocalBackend(): LocalLogBackend {
// TODO: Think about this interface a little better, it would be
// nicer to proxy the logs via the logger module
if (localBackend == null) {
// TODO: Type this as an internal inconsistency error
throw new Error('Local backend logger is not defined.');
}
return localBackend;
}
export function enable(value: boolean = true) {
if (backend != null) {
backend.publishEnabled = value;
}
}
export async function log(message: LogMessage) {
await backend?.log(message);
}
export function logSystemMessage(
message: string,
eventObj?: LogEventObject,
eventName?: string,
track: boolean = true,
) {
const msgObj: LogMessage = { message, isSystem: true, timestamp: Date.now() };
if (eventObj != null && eventObj.error != null) {
msgObj.isStdErr = true;
}
// IMPORTANT: this could potentially create a memory leak if logSystemMessage
// is used too quickly but we don't want supervisor logging to hold up other tasks
void log(msgObj);
if (track) {
eventTracker.track(
eventName != null ? eventName : message,
eventObj != null ? eventObj : {},
);
}
}
export function lock(containerId: string): Bluebird.Disposer<() => void> {
return takeGlobalLockRW(containerId).disposer((release) => {
release();
});
}
type ServiceInfo = { serviceId: number };
export async function attach(
containerId: string,
{ serviceId }: ServiceInfo,
): Promise<void> {
// First detect if we already have an attached log stream
// for this container
if (logMonitor.isAttached(containerId)) {
return;
}
return Bluebird.using(lock(containerId), async () => {
await logMonitor.attach(containerId, async (message) => {
await log({ ...message, serviceId });
});
});
}
export function logSystemEvent(
logType: LogType,
obj: LogEventObject,
track: boolean = true,
): void {
let message = logType.humanName;
const objectName = objectNameForLogs(obj);
if (objectName != null) {
message += ` '${objectName}'`;
}
if (obj && obj.error != null) {
let errorMessage = obj.error.message;
if (_.isEmpty(errorMessage)) {
errorMessage =
obj.error.name !== 'Error' ? obj.error.name : 'Unknown cause';
superConsole.warn('Invalid error message', obj.error);
}
message += ` due to '${errorMessage}'`;
}
logSystemMessage(message, obj, logType.eventName, track);
}
export function logConfigChange(
conf: { [configName: string]: string },
{ success = false, err }: { success?: boolean; err?: Error } = {},
) {
const obj: LogEventObject = { conf };
let message: string;
let eventName: string;
if (success) {
message = `Applied configuration change ${JSON.stringify(conf)}`;
eventName = 'Apply config change success';
} else if (err != null) {
message = `Error applying configuration change: ${err}`;
eventName = 'Apply config change error';
obj.error = err;
} else {
message = `Applying configuration change ${JSON.stringify(conf)}`;
eventName = 'Apply config change in progress';
}
logSystemMessage(message, obj, eventName);
}
function objectNameForLogs(eventObj: LogEventObject): string | null {
if (eventObj == null) {
return null;
}
if (
eventObj.service != null &&
eventObj.service.serviceName != null &&
eventObj.service.config != null &&
eventObj.service.config.image != null
) {
return `${eventObj.service.serviceName} ${eventObj.service.config.image}`;
}
if (eventObj.image != null) {
return eventObj.image.name;
}
if (eventObj.network != null && eventObj.network.name != null) {
return eventObj.network.name;
}
if (eventObj.volume != null && eventObj.volume.name != null) {
return eventObj.volume.name;
}
if (eventObj.fields != null) {
return eventObj.fields.join(',');
}
return null;
}

View File

@ -6,7 +6,7 @@ import url from 'url';
import zlib from 'zlib';
import { setTimeout } from 'timers/promises';
import type { LogMessage } from './log-backend';
import type { LogMessage } from './types';
import { LogBackend } from './log-backend';
import log from '../lib/supervisor-console';

View File

@ -1,5 +1,232 @@
import Bluebird from 'bluebird';
import _ from 'lodash';
import * as config from '../config';
import * as eventTracker from '../event-tracker';
import type { LogType } from '../lib/log-types';
import { takeGlobalLockRW } from '../lib/process-lock';
import { BalenaLogBackend } from './balena-backend';
import { LocalLogBackend } from './local-backend';
import { LogBackend, LogMessage } from './log-backend';
import type { LogBackend } from './log-backend';
import type { LogMessage } from './types';
import logMonitor from './monitor';
export { LocalLogBackend, LogBackend, LogMessage, BalenaLogBackend };
import * as globalEventBus from '../event-bus';
import superConsole from '../lib/supervisor-console';
type LogEventObject = Dictionary<any> | null;
let backend: LogBackend | null = null;
let balenaBackend: BalenaLogBackend | null = null;
let localBackend: LocalLogBackend | null = null;
export const initialized = _.once(async () => {
await config.initialized();
const {
apiEndpoint,
logsEndpoint,
uuid,
deviceApiKey,
unmanaged,
loggingEnabled,
localMode,
} = await config.getMany([
'apiEndpoint',
'logsEndpoint',
'uuid',
'deviceApiKey',
'unmanaged',
'loggingEnabled',
'localMode',
]);
balenaBackend = new BalenaLogBackend(
logsEndpoint ?? apiEndpoint,
uuid,
deviceApiKey,
);
localBackend = new LocalLogBackend();
backend = localMode ? localBackend : balenaBackend;
backend.unmanaged = unmanaged;
backend.publishEnabled = loggingEnabled;
logMonitor.attachSystemLogger(log);
if (!balenaBackend.isInitialised()) {
globalEventBus.getInstance().once('deviceProvisioned', async () => {
const conf = await config.getMany([
'uuid',
'apiEndpoint',
'logsEndpoint',
'deviceApiKey',
]);
// We use Boolean here, as deviceApiKey when unset
// is '' for legacy reasons. Once we're totally
// typescript, we can make it have a default value
// of undefined.
if (_.every(conf, Boolean)) {
// Everything is set, provide the values to the
// balenaBackend, and remove our listener
balenaBackend!.assignFields(
conf.logsEndpoint ?? conf.apiEndpoint,
conf.uuid!,
conf.deviceApiKey,
);
}
});
}
});
export function switchBackend(localMode: boolean) {
if (localMode) {
// Use the local mode backend
backend = localBackend;
superConsole.info('Switching logging backend to LocalLogBackend');
} else {
// Use the balena backend
backend = balenaBackend;
superConsole.info('Switching logging backend to BalenaLogBackend');
}
}
export function getLocalBackend(): LocalLogBackend {
// TODO: Think about this interface a little better, it would be
// nicer to proxy the logs via the logger module
if (localBackend == null) {
// TODO: Type this as an internal inconsistency error
throw new Error('Local backend logger is not defined.');
}
return localBackend;
}
export function enable(value: boolean = true) {
if (backend != null) {
backend.publishEnabled = value;
}
}
export async function log(message: LogMessage) {
await backend?.log(message);
}
export function logSystemMessage(
message: string,
eventObj?: LogEventObject,
eventName?: string,
track: boolean = true,
) {
const msgObj: LogMessage = { message, isSystem: true, timestamp: Date.now() };
if (eventObj != null && eventObj.error != null) {
msgObj.isStdErr = true;
}
// IMPORTANT: this could potentially create a memory leak if logSystemMessage
// is used too quickly but we don't want supervisor logging to hold up other tasks
void log(msgObj);
if (track) {
eventTracker.track(
eventName != null ? eventName : message,
eventObj != null ? eventObj : {},
);
}
}
function lock(containerId: string): Bluebird.Disposer<() => void> {
return takeGlobalLockRW(containerId).disposer((release) => {
release();
});
}
type ServiceInfo = { serviceId: number };
export async function attach(
containerId: string,
{ serviceId }: ServiceInfo,
): Promise<void> {
// First detect if we already have an attached log stream
// for this container
if (logMonitor.isAttached(containerId)) {
return;
}
return Bluebird.using(lock(containerId), async () => {
await logMonitor.attach(containerId, async (message) => {
await log({ ...message, serviceId });
});
});
}
export function logSystemEvent(
logType: LogType,
obj: LogEventObject,
track: boolean = true,
): void {
let message = logType.humanName;
const objectName = objectNameForLogs(obj);
if (objectName != null) {
message += ` '${objectName}'`;
}
if (obj && obj.error != null) {
let errorMessage = obj.error.message;
if (_.isEmpty(errorMessage)) {
errorMessage =
obj.error.name !== 'Error' ? obj.error.name : 'Unknown cause';
superConsole.warn('Invalid error message', obj.error);
}
message += ` due to '${errorMessage}'`;
}
logSystemMessage(message, obj, logType.eventName, track);
}
export function logConfigChange(
conf: { [configName: string]: string },
{ success = false, err }: { success?: boolean; err?: Error } = {},
) {
const obj: LogEventObject = { conf };
let message: string;
let eventName: string;
if (success) {
message = `Applied configuration change ${JSON.stringify(conf)}`;
eventName = 'Apply config change success';
} else if (err != null) {
message = `Error applying configuration change: ${err}`;
eventName = 'Apply config change error';
obj.error = err;
} else {
message = `Applying configuration change ${JSON.stringify(conf)}`;
eventName = 'Apply config change in progress';
}
logSystemMessage(message, obj, eventName);
}
function objectNameForLogs(eventObj: LogEventObject): string | null {
if (eventObj == null) {
return null;
}
if (
eventObj.service != null &&
eventObj.service.serviceName != null &&
eventObj.service.config != null &&
eventObj.service.config.image != null
) {
return `${eventObj.service.serviceName} ${eventObj.service.config.image}`;
}
if (eventObj.image != null) {
return eventObj.image.name;
}
if (eventObj.network != null && eventObj.network.name != null) {
return eventObj.network.name;
}
if (eventObj.volume != null && eventObj.volume.name != null) {
return eventObj.volume.name;
}
if (eventObj.fields != null) {
return eventObj.fields.join(',');
}
return null;
}

View File

@ -2,7 +2,7 @@ import _ from 'lodash';
import { Readable } from 'stream';
import { checkInt } from '../lib/validation';
import type { LogMessage } from './log-backend';
import type { LogMessage } from './types';
import { LogBackend } from './log-backend';
import log from '../lib/supervisor-console';

View File

@ -1,18 +1,4 @@
type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp: number;
};
export type LogMessage = BaseLogMessage &
(
| {
serviceId: number;
isSystem?: false;
}
| {
isSystem: true;
}
);
import type { LogMessage } from './types';
export abstract class LogBackend {
public unmanaged: boolean;

View File

@ -1,14 +1,15 @@
import { pipeline } from 'stream/promises';
import { setTimeout } from 'timers/promises';
import type { ContainerInspectInfo } from 'dockerode';
import { spawnJournalctl, toJournalDate } from '../lib/journald';
import log from '../lib/supervisor-console';
import { setTimeout } from 'timers/promises';
import { docker } from '../lib/docker-utils';
import type { SpawnJournalctlOpts } from '../lib/journald';
import type { SystemLogMessage, BaseLogMessage } from './types';
export type MonitorHook = (message: {
message: string;
isStdErr: boolean;
timestamp: number;
}) => Resolvable<void>;
type MonitorHook = (message: BaseLogMessage) => Promise<void>;
type SystemMonitorHook = (message: SystemLogMessage) => Promise<void>;
// This is nowhere near the amount of fields provided by journald, but simply the ones
// that we are interested in
@ -18,12 +19,25 @@ interface JournalRow {
MESSAGE: string | number[];
PRIORITY: string;
__REALTIME_TIMESTAMP: string;
_SYSTEMD_UNIT: string;
}
// Wait 5s when journalctl failed before trying to read the logs again
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
const JOURNALCTL_ERROR_RETRY_DELAY_MAX = 15 * 60 * 1000;
// Additional host services we want to stream the logs for
const HOST_SERVICES = [
// Balena service which applies power mode to config file on boot
'os-power-mode.service',
// Balena service which applies fan profile to device at runtime
'os-fan-profile.service',
// Nvidia power daemon which logs result from applying power mode from config file to device
'nvpmodel.service',
// Runs at boot time and checks if Orin QSPI is accessible after provisioning
'jetson-qspi-manager.service',
];
function messageFieldToString(entry: JournalRow['MESSAGE']): string | null {
if (Array.isArray(entry)) {
return String.fromCharCode(...entry);
@ -51,6 +65,19 @@ async function* splitStream(chunkIterable: AsyncIterable<any>) {
}
}
const getSupervisorContainer =
async (): Promise<ContainerInspectInfo | null> => {
try {
return await Promise.any([
docker.getContainer('balena_supervisor').inspect(),
docker.getContainer('resin_supervisor').inspect(),
]);
} catch {
// If all promises reject, return null
return null;
}
};
/**
* Streams logs from journalctl and calls container hooks when a record is received matching container id
*/
@ -60,28 +87,29 @@ class LogMonitor {
hook: MonitorHook;
};
} = {};
private systemHook: SystemMonitorHook = async () => {
/* Default empty hook */
};
private setupAttempts = 0;
// Only stream logs since the start of the supervisor
private lastSentTimestamp = Date.now() - performance.now();
// By default, only stream logs since the start of the Supervisor process
private lastSentTimestamp: number | null = null;
public async start(): Promise<void> {
// Get journalctl spawn options
const opts = await this.getJournalctlOptions();
// Spawn journalctl process to stream logs
try {
// TODO: do not spawn journalctl if logging is not enabled
const { stdout, stderr } = spawnJournalctl({
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
since: toJournalDate(this.lastSentTimestamp),
});
const { stdout, stderr } = spawnJournalctl(opts);
if (!stdout) {
// this will be catched below
// This error will be caught below
throw new Error('failed to open process stream');
}
stderr?.on('data', (data) =>
log.error('journalctl - balena.service stderr: ', data.toString()),
log.error('Journalctl process stderr: ', data.toString()),
);
const self = this;
@ -96,15 +124,17 @@ class LogMonitor {
self.containers[row.CONTAINER_ID_FULL]
) {
await self.handleRow(row);
} else if (HOST_SERVICES.includes(row._SYSTEMD_UNIT)) {
await self.handleHostServiceRow(row);
}
} catch {
// ignore parsing errors
}
}
});
log.debug('balena.service journalctl process exit.');
log.debug('Journalctl process exit.');
} catch (e: any) {
log.error('journalctl - balena.service error: ', e.message ?? e);
log.error('Journalctl process error: ', e.message ?? e);
}
// On exit of process try to create another
@ -113,14 +143,52 @@ class LogMonitor {
JOURNALCTL_ERROR_RETRY_DELAY_MAX,
);
log.debug(
`Spawning another process to watch balena.service logs in ${
wait / 1000
}s`,
`Spawning another process to watch journal logs in ${wait / 1000}s`,
);
await setTimeout(wait);
void this.start();
}
private async getJournalctlOptions(): Promise<SpawnJournalctlOpts> {
// On SV start, journalctl is spawned with a timestamp to only
// get logs since the last Supervisor State.FinishedAt. This will catch any
// host and container logs generated while the Supervisor was not running.
const supervisorContainer = await getSupervisorContainer();
if (supervisorContainer !== null) {
const finishedAt = supervisorContainer.State.FinishedAt;
const finishedAtDate = new Date(finishedAt).getTime();
// When a container has never exited with any exit code,
// the FinishedAt timestamp is "0001-01-01T00:00:00Z". Any
// timestamp below 0 in ms value is from before the epoch.
// Only set the lastSentTimestamp to the last Supervisor State.FinishedAt if:
// - finishedAtDate is greater than 0 (i.e. the supervisor container has exited at least once)
// - lastSentTimestamp is null (i.e. this is the first time we've started the monitor)
// - This prevents the case of the logs getting streamed from State.FinishedAt for
// subsequent monitor.start() calls due to the underlying journalctl process dying.
if (finishedAtDate > 0 && this.lastSentTimestamp == null) {
this.lastSentTimestamp = finishedAtDate;
}
}
// If the conditions weren't met to set the lastSentTimestamp, use the process uptime
if (this.lastSentTimestamp == null) {
this.lastSentTimestamp = Date.now() - performance.now();
}
return {
all: true,
follow: true,
format: 'json',
filter: [
// Monitor logs from balenad by default for container log-streaming
'balena.service',
// Add any host services we want to stream
...HOST_SERVICES,
].map((s) => `_SYSTEMD_UNIT=${s}`),
since: toJournalDate(this.lastSentTimestamp),
};
}
public isAttached(containerId: string): boolean {
return containerId in this.containers;
}
@ -137,6 +205,10 @@ class LogMonitor {
delete this.containers[containerId];
}
public attachSystemLogger(hook: SystemMonitorHook) {
this.systemHook = hook;
}
private async handleRow(row: JournalRow) {
if (
row.CONTAINER_ID_FULL == null ||
@ -153,12 +225,33 @@ class LogMonitor {
if (message == null) {
return;
}
const isStdErr = row.PRIORITY === '3';
const isStdErr = parseInt(row.PRIORITY, 10) <= 3;
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
await this.containers[containerId].hook({ message, isStdErr, timestamp });
await this.containers[containerId].hook({
message,
isStdErr,
timestamp,
});
this.lastSentTimestamp = timestamp;
}
private async handleHostServiceRow(
row: JournalRow & { _SYSTEMD_UNIT: string },
) {
const message = messageFieldToString(row.MESSAGE);
if (message == null) {
return;
}
const isStdErr = parseInt(row.PRIORITY, 10) <= 3;
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
void this.systemHook({
message,
isStdErr,
timestamp,
isSystem: true,
});
}
}
const logMonitor = new LogMonitor();

13
src/logging/types.ts Normal file
View File

@ -0,0 +1,13 @@
export type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp: number;
};
export type SystemLogMessage = BaseLogMessage & {
isSystem: true;
};
type ContainerLogMessage = BaseLogMessage & {
serviceId: number;
isSystem?: false;
};
export type LogMessage = SystemLogMessage | ContainerLogMessage;

View File

@ -2,7 +2,7 @@ import * as apiBinder from './api-binder';
import * as db from './db';
import * as config from './config';
import * as deviceState from './device-state';
import * as logger from './logger';
import * as logger from './logging';
import SupervisorAPI from './device-api';
import * as v1 from './device-api/v1';
import * as v2 from './device-api/v2';

View File

@ -3,7 +3,7 @@ import type { SinonStub } from 'sinon';
import { stub } from 'sinon';
import { Volume } from '~/src/compose/volume';
import * as logTypes from '~/lib/log-types';
import * as logger from '~/src/logger';
import * as logger from '~/src/logging';
import Docker from 'dockerode';

View File

@ -7,7 +7,7 @@ import { expect } from 'chai';
import * as deviceConfig from '~/src/device-config';
import * as fsUtils from '~/lib/fs-utils';
import * as logger from '~/src/logger';
import * as logger from '~/src/logging';
import { Extlinux } from '~/src/config/backends/extlinux';
import { ConfigTxt } from '~/src/config/backends/config-txt';
import { Odmdata } from '~/src/config/backends/odmdata';

View File

@ -3,7 +3,7 @@ import { expect } from 'chai';
import sinon from 'sinon';
import * as config from '~/src/config';
import * as logger from '~/src/logger';
import * as logger from '~/src/logging';
import * as iptablesMock from '~/test-lib/mocked-iptables';
import * as dbFormat from '~/src/device-state/db-format';

View File

@ -9,7 +9,7 @@ import { setTimeout } from 'timers/promises';
import * as config from '~/src/config';
describe('Logger', function () {
let logger: typeof import('~/src/logger');
let logger: typeof import('~/src/logging');
let configStub: sinon.SinonStub;
beforeEach(async function () {

View File

@ -14,7 +14,7 @@ import * as images from '~/src/compose/images';
import * as config from '~/src/config';
import * as mockedDockerode from '~/test-lib/mocked-dockerode';
import * as applicationManager from '~/src/compose/application-manager';
import * as logger from '~/src/logger';
import * as logger from '~/src/logging';
describe('SupervisorAPI [V2 Endpoints]', () => {
let serviceManagerMock: SinonStub;

View File

@ -28,6 +28,7 @@ describe('lib/journald', () => {
unit: 'nginx.service',
containerId: 'abc123',
format: 'json-pretty',
filter: ['_SYSTEMD_UNIT=test.service', '_SYSTEMD_UNIT=test2.service'],
since: '2014-03-25 03:59:56.654563',
until: '2014-03-25 03:59:59.654563',
});
@ -48,6 +49,8 @@ describe('lib/journald', () => {
'2014-03-25 03:59:56.654563',
'-U',
'2014-03-25 03:59:59.654563',
'_SYSTEMD_UNIT=test.service',
'_SYSTEMD_UNIT=test2.service',
];
const actualCommand = spawn.firstCall.args[0];