Add ability to stream logs from host services to cloud

Add `os-power-mode.service`, `nvpmodel.service`, and `os-fan-profile.service`
which report status from applying power mode and fan profile configs as read
from config.json. The Supervisor sets these configs in config.json for these
host services to pick up and apply.

Also add host log streaming from `jetson-qspi-manager.service` as it
will very soon be needed for Jetson Orins.

Relates-to: #2379
See: balena-io/open-balena-api#1792
See: balena-os/balena-jetson-orin#513
Change-type: minor
Signed-off-by: Christina Ying Wang <christina@balena.io>
This commit is contained in:
Christina Ying Wang 2024-11-18 13:41:41 -08:00
parent c610710f03
commit fb6fa9b16c
8 changed files with 90 additions and 31 deletions

View File

@ -21,7 +21,7 @@ export interface SpawnJournalctlOpts {
unit?: string;
containerId?: string;
format: string;
filterString?: string;
filter?: string | string[];
since?: string;
until?: string;
}
@ -57,8 +57,16 @@ export function spawnJournalctl(opts: SpawnJournalctlOpts): ChildProcess {
args.push('-o');
args.push(opts.format);
if (opts.filterString) {
args.push(opts.filterString);
if (opts.filter != null) {
// A single filter argument without spaces can be passed as a string
if (typeof opts.filter === 'string') {
args.push(opts.filter);
} else {
// Multiple filter arguments need to be passed as an array of strings
// instead of a single string with spaces, as `spawn` will interpret
// the single string as a single argument to journalctl, which is invalid.
args.push(...opts.filter);
}
}
log.debug('Spawning journalctl', args.join(' '));

View File

@ -6,7 +6,7 @@ import url from 'url';
import zlib from 'zlib';
import { setTimeout } from 'timers/promises';
import type { LogMessage } from './log-backend';
import type { LogMessage } from './types';
import { LogBackend } from './log-backend';
import log from '../lib/supervisor-console';

View File

@ -7,7 +7,8 @@ import type { LogType } from '../lib/log-types';
import { takeGlobalLockRW } from '../lib/process-lock';
import { BalenaLogBackend } from './balena-backend';
import { LocalLogBackend } from './local-backend';
import type { LogBackend, LogMessage } from './log-backend';
import type { LogBackend } from './log-backend';
import type { LogMessage } from './types';
import logMonitor from './monitor';
import * as globalEventBus from '../event-bus';
@ -49,6 +50,8 @@ export const initialized = _.once(async () => {
backend.unmanaged = unmanaged;
backend.publishEnabled = loggingEnabled;
logMonitor.attachSystemLogger(log);
if (!balenaBackend.isInitialised()) {
globalEventBus.getInstance().once('deviceProvisioned', async () => {
const conf = await config.getMany([

View File

@ -2,7 +2,7 @@ import _ from 'lodash';
import { Readable } from 'stream';
import { checkInt } from '../lib/validation';
import type { LogMessage } from './log-backend';
import type { LogMessage } from './types';
import { LogBackend } from './log-backend';
import log from '../lib/supervisor-console';

View File

@ -1,18 +1,4 @@
type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp: number;
};
export type LogMessage = BaseLogMessage &
(
| {
serviceId: number;
isSystem?: false;
}
| {
isSystem: true;
}
);
import type { LogMessage } from './types';
export abstract class LogBackend {
public unmanaged: boolean;

View File

@ -1,14 +1,12 @@
import { pipeline } from 'stream/promises';
import { setTimeout } from 'timers/promises';
import { spawnJournalctl, toJournalDate } from '../lib/journald';
import log from '../lib/supervisor-console';
import { setTimeout } from 'timers/promises';
import type { SystemLogMessage, BaseLogMessage } from './types';
export type MonitorHook = (message: {
message: string;
isStdErr: boolean;
timestamp: number;
}) => Resolvable<void>;
type MonitorHook = (message: BaseLogMessage) => Promise<void>;
type SystemMonitorHook = (message: SystemLogMessage) => Promise<void>;
// This is nowhere near the amount of fields provided by journald, but simply the ones
// that we are interested in
@ -18,12 +16,25 @@ interface JournalRow {
MESSAGE: string | number[];
PRIORITY: string;
__REALTIME_TIMESTAMP: string;
_SYSTEMD_UNIT: string;
}
// Wait 5s when journalctl failed before trying to read the logs again
const JOURNALCTL_ERROR_RETRY_DELAY = 5000;
const JOURNALCTL_ERROR_RETRY_DELAY_MAX = 15 * 60 * 1000;
// Additional host services we want to stream the logs for
const HOST_SERVICES = [
// Balena service which applies power mode to config file on boot
'os-power-mode.service',
// Balena service which applies fan profile to device at runtime
'os-fan-profile.service',
// Nvidia power daemon which logs result from applying power mode from config file to device
'nvpmodel.service',
// Runs at boot time and checks if Orin QSPI is accessible after provisioning
'jetson-qspi-manager.service',
];
function messageFieldToString(entry: JournalRow['MESSAGE']): string | null {
if (Array.isArray(entry)) {
return String.fromCharCode(...entry);
@ -60,6 +71,9 @@ class LogMonitor {
hook: MonitorHook;
};
} = {};
private systemHook: SystemMonitorHook = async () => {
/* Default empty hook */
};
private setupAttempts = 0;
// Only stream logs since the start of the supervisor
@ -72,11 +86,16 @@ class LogMonitor {
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
filter: [
// Monitor logs from balenad by default for container log-streaming
'balena.service',
// Add any host services we want to stream
...HOST_SERVICES,
].map((s) => `_SYSTEMD_UNIT=${s}`),
since: toJournalDate(this.lastSentTimestamp),
});
if (!stdout) {
// this will be catched below
// This error will be caught below
throw new Error('failed to open process stream');
}
@ -96,6 +115,8 @@ class LogMonitor {
self.containers[row.CONTAINER_ID_FULL]
) {
await self.handleRow(row);
} else if (HOST_SERVICES.includes(row._SYSTEMD_UNIT)) {
await self.handleHostServiceRow(row);
}
} catch {
// ignore parsing errors
@ -135,6 +156,10 @@ class LogMonitor {
delete this.containers[containerId];
}
public attachSystemLogger(hook: SystemMonitorHook) {
this.systemHook = hook;
}
private async handleRow(row: JournalRow) {
if (
row.CONTAINER_ID_FULL == null ||
@ -151,12 +176,33 @@ class LogMonitor {
if (message == null) {
return;
}
const isStdErr = row.PRIORITY === '3';
const isStdErr = parseInt(row.PRIORITY, 10) <= 3;
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
await this.containers[containerId].hook({ message, isStdErr, timestamp });
await this.containers[containerId].hook({
message,
isStdErr,
timestamp,
});
this.lastSentTimestamp = timestamp;
}
private async handleHostServiceRow(
row: JournalRow & { _SYSTEMD_UNIT: string },
) {
const message = messageFieldToString(row.MESSAGE);
if (message == null) {
return;
}
const isStdErr = parseInt(row.PRIORITY, 10) <= 3;
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
void this.systemHook({
message,
isStdErr,
timestamp,
isSystem: true,
});
}
}
const logMonitor = new LogMonitor();

13
src/logging/types.ts Normal file
View File

@ -0,0 +1,13 @@
export type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp: number;
};
export type SystemLogMessage = BaseLogMessage & {
isSystem: true;
};
type ContainerLogMessage = BaseLogMessage & {
serviceId: number;
isSystem?: false;
};
export type LogMessage = SystemLogMessage | ContainerLogMessage;

View File

@ -28,6 +28,7 @@ describe('lib/journald', () => {
unit: 'nginx.service',
containerId: 'abc123',
format: 'json-pretty',
filter: ['_SYSTEMD_UNIT=test.service', '_SYSTEMD_UNIT=test2.service'],
since: '2014-03-25 03:59:56.654563',
until: '2014-03-25 03:59:59.654563',
});
@ -48,6 +49,8 @@ describe('lib/journald', () => {
'2014-03-25 03:59:56.654563',
'-U',
'2014-03-25 03:59:59.654563',
'_SYSTEMD_UNIT=test.service',
'_SYSTEMD_UNIT=test2.service',
];
const actualCommand = spawn.firstCall.args[0];