Improve the LogBackend interface

This make the LogBackend `log` method into an async method in
preparation for upcoming changes that will use backpressure from the
connection to delay logging coming from containers.

This also removes unnecessary imageId from the LogMessage type

Change-type: patch
This commit is contained in:
Felipe Lalanne 2024-07-22 16:02:20 -04:00
parent 5af948483a
commit f3fcb0db7a
No known key found for this signature in database
GPG Key ID: 03E696BFD472B26A
6 changed files with 49 additions and 47 deletions

View File

@ -353,14 +353,13 @@ export async function start(service: Service) {
}
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.start`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.start`,
);
}
void logger.attach(container.id, { serviceId, imageId });
void logger.attach(container.id, { serviceId });
if (!alreadyStarted) {
logger.logSystemEvent(LogTypes.startServiceSuccess, { service });
@ -416,15 +415,13 @@ export function listenToEvents() {
});
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.listenToEvents`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.listenToEvents`,
);
}
void logger.attach(data.id, {
serviceId,
imageId,
});
} else if (status === 'destroy') {
await logMonitor.detach(data.id);
@ -468,10 +465,9 @@ export async function attachToRunning() {
for (const service of services) {
if (service.status === 'Running') {
const serviceId = service.serviceId;
const imageId = service.imageId;
if (serviceId == null || imageId == null) {
if (serviceId == null) {
throw new InternalInconsistencyError(
`serviceId and imageId not defined for service: ${service.serviceName} in ServiceManager.start`,
`serviceId not defined for service: ${service.serviceName} in ServiceManager.start`,
);
}
@ -482,7 +478,6 @@ export async function attachToRunning() {
}
void logger.attach(service.containerId, {
serviceId,
imageId,
});
}
}

View File

@ -103,8 +103,8 @@ export function enable(value: boolean = true) {
}
}
export function log(message: LogMessage) {
backend?.log(message);
export async function log(message: LogMessage) {
await backend?.log(message);
}
export function logSystemMessage(
@ -113,11 +113,13 @@ export function logSystemMessage(
eventName?: string,
track: boolean = true,
) {
const msgObj: LogMessage = { message, isSystem: true };
const msgObj: LogMessage = { message, isSystem: true, timestamp: Date.now() };
if (eventObj != null && eventObj.error != null) {
msgObj.isStdErr = true;
}
log(msgObj);
// IMPORTANT: this could potentially create a memory leak if logSystemMessage
// is used too quickly but we don't want supervisor logging to hold up other tasks
void log(msgObj);
if (track) {
eventTracker.track(
eventName != null ? eventName : message,
@ -132,20 +134,20 @@ export function lock(containerId: string): Bluebird.Disposer<() => void> {
});
}
type ServiceInfo = { serviceId: number; imageId: number };
export function attach(
type ServiceInfo = { serviceId: number };
export async function attach(
containerId: string,
{ serviceId, imageId }: ServiceInfo,
): Bluebird<void> {
{ serviceId }: ServiceInfo,
): Promise<void> {
// First detect if we already have an attached log stream
// for this container
if (logMonitor.isAttached(containerId)) {
return Bluebird.resolve();
return;
}
return Bluebird.using(lock(containerId), async () => {
await logMonitor.attach(containerId, (message) => {
log({ ...message, serviceId, imageId });
await logMonitor.attach(containerId, async (message) => {
await log({ ...message, serviceId });
});
});
}

View File

@ -76,10 +76,11 @@ export class BalenaLogBackend extends LogBackend {
return this.initialised;
}
public log(message: LogMessage) {
public async log(message: LogMessage) {
// TODO: Perhaps don't just drop logs when we haven't
// yet initialised (this happens when a device has not yet
// been provisioned)
// TODO: the backend should not be aware of unmanaged or publish state
if (this.unmanaged || !this.publishEnabled || !this.initialised) {
return;
}
@ -92,13 +93,10 @@ export class BalenaLogBackend extends LogBackend {
return;
}
message.timestamp ??= Date.now();
message.message = message.message
? _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
})
: '';
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
}

View File

@ -1,17 +1,15 @@
type BaseLogMessage = {
message: string;
isStdErr?: boolean;
timestamp?: number;
timestamp: number;
};
export type LogMessage = BaseLogMessage &
(
| {
serviceId?: number;
imageId?: number;
serviceId: number;
isSystem?: false;
}
| {
message: string;
isSystem: true;
}
);
@ -20,7 +18,7 @@ export abstract class LogBackend {
public unmanaged: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
public abstract log(message: LogMessage): Promise<void>;
}
export default LogBackend;

View File

@ -8,7 +8,7 @@ export type MonitorHook = (message: {
message: string;
isStdErr: boolean;
timestamp: number;
}) => void;
}) => Resolvable<void>;
// This is nowhere near the amount of fields provided by journald, but simply the ones
// that we are interested in
@ -67,6 +67,7 @@ class LogMonitor {
public async start(): Promise<void> {
try {
// TODO: do not spawn journalctl if logging is not enabled
const journalctl = spawnJournalctl({
all: true,
follow: true,
@ -95,7 +96,7 @@ class LogMonitor {
self.containers[row.CONTAINER_ID_FULL]
) {
self.setupAttempts = 0;
self.handleRow(row);
await self.handleRow(row);
}
} catch {
// ignore parsing errors
@ -137,7 +138,7 @@ class LogMonitor {
delete this.containers[containerId];
}
private handleRow(row: JournalRow) {
private async handleRow(row: JournalRow) {
if (
row.CONTAINER_ID_FULL == null ||
row.CONTAINER_NAME === 'balena_supervisor' ||
@ -156,7 +157,7 @@ class LogMonitor {
const isStdErr = row.PRIORITY === '3';
const timestamp = Math.floor(Number(row.__REALTIME_TIMESTAMP) / 1000); // microseconds to milliseconds
this.containers[containerId].hook({ message, isStdErr, timestamp });
await this.containers[containerId].hook({ message, isStdErr, timestamp });
this.lastSentTimestamp = timestamp;
}
}

View File

@ -52,8 +52,12 @@ describe('Logger', function () {
it('waits the grace period before sending any logs', async function () {
const clock = sinon.useFakeTimers();
logger.log({ message: 'foobar', serviceId: 15 });
clock.tick(4999);
await logger.log({
message: 'foobar',
serviceId: 15,
timestamp: Date.now(),
});
await clock.tickAsync(4999);
clock.restore();
await setTimeout(100);
@ -62,8 +66,12 @@ describe('Logger', function () {
it('tears down the connection after inactivity', async function () {
const clock = sinon.useFakeTimers();
logger.log({ message: 'foobar', serviceId: 15 });
clock.tick(61000);
await logger.log({
message: 'foobar',
serviceId: 15,
timestamp: Date.now(),
});
await clock.tickAsync(61000);
clock.restore();
await setTimeout(100);
@ -72,9 +80,9 @@ describe('Logger', function () {
it('sends logs as gzipped ndjson', async function () {
const timestamp = Date.now();
logger.log({ message: 'foobar', serviceId: 15 });
logger.log({ timestamp: 1337, message: 'foobar', serviceId: 15 });
logger.log({ message: 'foobar' }); // shold be ignored
await logger.log({ message: 'foobar', serviceId: 15, timestamp: 1000 });
await logger.log({ timestamp: 1337, message: 'foobar', serviceId: 15 });
await logger.log({ message: 'foobar', isSystem: true, timestamp: 1500 }); // shold be ignored
await setTimeout(5500);
expect(this.requestStub.calledOnce).to.be.true;