Support local logging via standard logging interface

Change-type: patch
Signed-off-by: Cameron Diver <cameron@resin.io>
This commit is contained in:
Cameron Diver 2018-10-09 12:02:38 +01:00
parent 19cd310da3
commit 6e2801380b
No known key found for this signature in database
GPG Key ID: 69264F9C923F55C1
7 changed files with 342 additions and 210 deletions

View File

@ -1,202 +1,17 @@
import * as Bluebird from 'bluebird';
import * as es from 'event-stream';
import { ClientRequest } from 'http';
import * as https from 'https';
import * as _ from 'lodash';
import * as Lock from 'rwlock';
import * as stream from 'stream';
import * as url from 'url';
import * as zlib from 'zlib';
import { EventTracker } from './event-tracker';
import Docker = require('./lib/docker-utils');
import { LogType } from './lib/log-types';
const ZLIB_TIMEOUT = 100;
const COOLDOWN_PERIOD = 5 * 1000;
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;
const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;
interface Options extends url.UrlWithParsedQuery {
method: string;
headers: Dictionary<string>;
}
type LogMessage = Dictionary<any>;
abstract class LogBackend {
public offlineMode: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
}
class ResinLogBackend extends LogBackend {
private req: ClientRequest | null = null;
private dropCount: number = 0;
private writable: boolean = true;
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
timeout: NodeJS.Timer;
public constructor(
apiEndpoint: string,
uuid: string,
deviceApiKey: string,
) {
super();
this.opts = url.parse(`${apiEndpoint}/device/v2/${uuid}/log-stream`) as any;
this.opts.method = 'POST';
this.opts.headers = {
Authorization: `Bearer ${deviceApiKey}`,
'Content-Type': 'application/x-ndjson',
'Content-Encoding': 'gzip',
};
// This stream serves serves as a message buffer during reconnections
// while we unpipe the old, malfunctioning connection and then repipe a
// new one.
this.stream = new stream.PassThrough({
allowHalfOpen: true,
// We halve the high watermark because a passthrough stream has two
// buffers, one for the writable and one for the readable side. The
// write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2,
});
this.stream.on('drain', () => {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
isStdErr: true,
});
this.dropCount = 0;
}
});
}
public log(message: LogMessage) {
if (this.offlineMode || !this.publishEnabled) {
return;
}
if (!_.isObject(message)) {
return;
}
message = _.assign({
timestamp: Date.now(),
message: '',
}, message);
if (!message.isSystem && message.serviceId == null) {
return;
}
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
}
private setup = _.throttle(() => {
this.req = https.request(this.opts);
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
console.log('LogBackend: server responded with status code:', res.statusCode);
this.teardown();
});
this.req.on('timeout', () => this.teardown());
this.req.on('close', () => this.teardown());
this.req.on('error', (err) => {
console.log('LogBackend: unexpected error:', err);
this.teardown();
});
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', () => this.teardown());
this.gzip.pipe(this.req);
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentialy lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.stream.pipe(this.gzip);
this.flush();
}
}, RESPONSE_GRACE_PERIOD);
}, COOLDOWN_PERIOD);
private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
// Flushing every ZLIB_TIMEOUT hits a balance between compression and
// latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
// whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
private flush = _.throttle(() => {
if (this.gzip != null) {
this.gzip.flush(zlib.Z_SYNC_FLUSH);
}
}, ZLIB_TIMEOUT, { leading: false });
private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
}
this.req = null;
}
}
private write(message: LogMessage) {
this.snooze();
if (this.req == null) {
this.setup();
}
if (this.writable) {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} else {
this.dropCount += 1;
}
}
}
import {
LocalLogBackend,
LogBackend,
LogMessage,
ResinLogBackend,
} from './logging-backends';
interface LoggerSetupOptions {
apiEndpoint: string;
@ -204,6 +19,7 @@ interface LoggerSetupOptions {
deviceApiKey: string;
offlineMode: boolean;
enableLogs: boolean;
localMode: boolean;
}
type LogEventObject = Dictionary<any> | null;
@ -221,7 +37,11 @@ export class Logger {
private writeLock: (key: string) => Bluebird<() => void> = Bluebird.promisify(
new Lock().async.writeLock,
);
private backend: LogBackend | null = null;
private resinBackend: ResinLogBackend | null = null;
private localBackend: LocalLogBackend | null = null;
private eventTracker: EventTracker;
private attached: {
[key in OutputStream]: { [containerId: string]: boolean }
@ -241,13 +61,40 @@ export class Logger {
deviceApiKey,
offlineMode,
enableLogs,
localMode,
}: LoggerSetupOptions,
) {
this.backend = new ResinLogBackend(apiEndpoint, uuid, deviceApiKey);
this.resinBackend = new ResinLogBackend(apiEndpoint, uuid, deviceApiKey);
this.localBackend = new LocalLogBackend();
this.backend = localMode ? this.localBackend : this.resinBackend;
this.backend.offlineMode = offlineMode;
this.backend.publishEnabled = enableLogs;
}
public switchBackend(localMode: boolean) {
if (localMode) {
// Use the local mode backend
this.backend = this.localBackend;
console.log('Switching logging backend to LocalLogBackend');
} else {
// Use the resin backend
this.backend = this.resinBackend;
console.log('Switching logging backend to ResinLogBackend');
}
}
public getLocalBackend(): LocalLogBackend {
// TODO: Think about this interface a little better, it would be
// nicer to proxy the logs via the logger module
if (this.localBackend == null) {
// TODO: Type this as an internal inconsistency error
throw new Error('Local backend logger is not defined.');
}
return this.localBackend;
}
public enable(value: boolean = true) {
if (this.backend != null) {
this.backend.publishEnabled = value;
@ -349,27 +196,27 @@ export class Logger {
docker: Docker,
streamType: OutputStream,
containerId: string,
{ serviceId, imageId }: { serviceId: string, imageId: string},
{ serviceId, imageId }: { serviceId: string, imageId: string },
): Bluebird<void> {
return Bluebird.try(() => {
if (this.attached[streamType][containerId]) {
return;
}
return Bluebird.try(() => {
if (this.attached[streamType][containerId]) {
return;
}
const logsOpts = {
follow: true,
stdout: streamType === OutputStream.Stdout,
stderr: streamType === OutputStream.Stderr,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
const logsOpts = {
follow: true,
stdout: streamType === OutputStream.Stdout,
stderr: streamType === OutputStream.Stderr,
timestamps: true,
since: Math.floor(Date.now() / 1000),
};
return docker.getContainer(containerId).logs(logsOpts)
.then((stream) => {
this.attached[streamType][containerId] = true;
return docker.getContainer(containerId).logs(logsOpts)
.then((stream) => {
this.attached[streamType][containerId] = true;
stream
stream
.on('error', (err) => {
console.error('Error on container logs', err);
this.attached[streamType][containerId] = false;
@ -400,9 +247,9 @@ export class Logger {
.on('end', () => {
this.attached[streamType][containerId] = false;
});
});
});
});
});
}

View File

@ -0,0 +1,10 @@
import { LocalLogBackend } from './local-backend';
import { LogBackend, LogMessage } from './log-backend';
import { ResinLogBackend } from './resin-backend';
export {
LocalLogBackend,
LogBackend,
LogMessage,
ResinLogBackend,
};

View File

@ -0,0 +1,71 @@
import * as Bluebird from 'bluebird';
import * as _ from 'lodash';
import { Readable } from 'stream';
import { checkInt } from '../lib/validation';
import { LogBackend, LogMessage } from './log-backend';
export class LocalLogBackend extends LogBackend {
private globalListeners: Readable[] = [];
private serviceNameResolver: (serviceId: number) => Bluebird<string>;
public log(message: LogMessage): void {
if (this.publishEnabled) {
Bluebird.try(() => {
if (!message.isSystem) {
if (this.serviceNameResolver == null) {
// This means there is no listener assigned, drop the logs
// TODO: Store these, and resolve them when a listener is attached
return null;
}
const svcId = checkInt(message.serviceId);
if (svcId == null) {
console.log('Warning: Non-integer service id found in local logs: ');
console.log(` ${JSON.stringify(message)}`);
return null;
}
// TODO: Can we cache this value? The service ids are reused, so
// we would need a way of invalidating the cache
return this.serviceNameResolver(svcId).then((serviceName) => {
return _.assign({}, { serviceName }, message);
});
} else {
return message;
}
})
.then((message: LogMessage | null) => {
if (message != null) {
_.each(this.globalListeners, (listener) => {
listener.push(`${JSON.stringify(message)}\n`);
});
}
})
.catch((e) => {
console.log('Error streaming local log output: ', e);
});
}
}
/**
* Get a stream which will be populated with log messages from
* local mode services and the supervisor
*/
public attachListener(): Readable {
const stream = new Readable({
// We don't actually need to do anything here
read: _.noop,
});
this.globalListeners.push(stream);
return stream;
}
public assignServiceNameResolver(resolver: (serviceId: number) => Bluebird<string>) {
this.serviceNameResolver = resolver;
}
}
export default LocalLogBackend;

View File

@ -0,0 +1,11 @@
export type LogMessage = Dictionary<any>;
export abstract class LogBackend {
public offlineMode: boolean;
public publishEnabled: boolean = true;
public abstract log(message: LogMessage): void;
}
export default LogBackend;

View File

@ -0,0 +1,185 @@
import { ClientRequest } from 'http';
import * as https from 'https';
import * as _ from 'lodash';
import * as stream from 'stream';
import * as url from 'url';
import * as zlib from 'zlib';
import { LogBackend, LogMessage } from './log-backend';
const ZLIB_TIMEOUT = 100;
const COOLDOWN_PERIOD = 5 * 1000;
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;
const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;
interface Options extends url.UrlWithParsedQuery {
method: string;
headers: Dictionary<string>;
}
export class ResinLogBackend extends LogBackend {
private req: ClientRequest | null = null;
private dropCount: number = 0;
private writable: boolean = true;
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
timeout: NodeJS.Timer;
public constructor(
apiEndpoint: string,
uuid: string,
deviceApiKey: string,
) {
super();
this.opts = url.parse(`${apiEndpoint}/device/v2/${uuid}/log-stream`) as any;
this.opts.method = 'POST';
this.opts.headers = {
Authorization: `Bearer ${deviceApiKey}`,
'Content-Type': 'application/x-ndjson',
'Content-Encoding': 'gzip',
};
// This stream serves serves as a message buffer during reconnections
// while we unpipe the old, malfunctioning connection and then repipe a
// new one.
this.stream = new stream.PassThrough({
allowHalfOpen: true,
// We halve the high watermark because a passthrough stream has two
// buffers, one for the writable and one for the readable side. The
// write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2,
});
this.stream.on('drain', () => {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
isStdErr: true,
});
this.dropCount = 0;
}
});
}
public log(message: LogMessage) {
if (this.offlineMode || !this.publishEnabled) {
return;
}
if (!_.isObject(message)) {
return;
}
message = _.assign({
timestamp: Date.now(),
message: '',
}, message);
if (!message.isSystem && message.serviceId == null) {
return;
}
message.message = _.truncate(message.message, {
length: MAX_LOG_LENGTH,
omission: '[...]',
});
this.write(message);
}
private setup = _.throttle(() => {
this.req = https.request(this.opts);
// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
console.log('LogBackend: server responded with status code:', res.statusCode);
this.teardown();
});
this.req.on('timeout', () => this.teardown());
this.req.on('close', () => this.teardown());
this.req.on('error', (err) => {
console.log('LogBackend: unexpected error:', err);
this.teardown();
});
// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();
// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', () => this.teardown());
this.gzip.pipe(this.req);
// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentialy lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.stream.pipe(this.gzip);
this.flush();
}
}, RESPONSE_GRACE_PERIOD);
}, COOLDOWN_PERIOD);
private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
// Flushing every ZLIB_TIMEOUT hits a balance between compression and
// latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
// whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
private flush = _.throttle(() => {
if (this.gzip != null) {
this.gzip.flush(zlib.Z_SYNC_FLUSH);
}
}, ZLIB_TIMEOUT, { leading: false });
private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
}
this.req = null;
}
}
private write(message: LogMessage) {
this.snooze();
if (this.req == null) {
this.setup();
}
if (this.writable) {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} else {
this.dropCount += 1;
}
}
}

View File

@ -22,6 +22,7 @@ startupConfigFields = [
'mixpanelToken'
'mixpanelHost'
'loggingEnabled'
'localMode'
]
module.exports = class Supervisor extends EventEmitter
@ -57,7 +58,13 @@ module.exports = class Supervisor extends EventEmitter
deviceApiKey: conf.deviceApiKey,
offlineMode: checkTruthy(conf.offlineMode),
enableLogs: checkTruthy(conf.loggingEnabled),
localMode: checkTruthy(conf.localMode)
})
# Setup log backend switching for local mode changes
@config.on 'change', (changed) =>
if changed.localMode?
@logger.switchBackend(checkTruthy(changed.localMode))
.then =>
@deviceState.init()
.then =>

View File

@ -33,6 +33,7 @@ describe 'Logger', ->
deviceApiKey: 'secretkey'
offlineMode: false
enableLogs: true
localMode: false
})
afterEach ->