logger: Use the new logging backend

Change-type: minor
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
This commit is contained in:
Petros Angelatos 2018-07-10 13:12:46 -07:00
parent 9dcf3ed697
commit 0d812c272c
No known key found for this signature in database
GPG Key ID: 7DE8B46CDBCB42A1
7 changed files with 154 additions and 244 deletions

View File

@ -252,7 +252,6 @@ module.exports = class APIBinder
belongs_to__user: conf.userId
is_managed_by__device: conf.deviceId
uuid: deviceRegister.generateUniqueKey()
logs_channel: deviceRegister.generateUniqueKey()
registered_at: Math.floor(Date.now() / 1000)
})
@resinApi.post
@ -306,27 +305,6 @@ module.exports = class APIBinder
console.log('Could not pin device to release!')
console.log('Error: ', e)
_sendLogsRequest: (uuid, data) =>
reqBody = _.map(data, (msg) -> _.mapKeys(msg, (v, k) -> _.snakeCase(k)))
@config.get('apiEndpoint')
.then (resinApiEndpoint) =>
endpoint = url.resolve(resinApiEndpoint, "/device/v2/#{uuid}/logs")
requestParams = _.extend
method: 'POST'
url: endpoint
body: reqBody
, @cachedResinApi.passthrough
@cachedResinApi._request(requestParams)
logDependent: (uuid, msg) =>
@_sendLogsRequest(uuid, [ msg ])
logBatch: (messages) =>
@config.get('uuid')
.then (uuid) =>
@_sendLogsRequest(uuid, messages)
# Creates the necessary config vars in the API to match the current device state,
# without overwriting any variables that are already set.
_reportInitialEnv: (apiEndpoint) =>

View File

@ -79,16 +79,13 @@ class Config extends EventEmitter {
// a JSON value, which is either null, or { app: number, commit: string }
pinDevice: { source: 'db', mutable: true, default: 'null' },
currentCommit: { source: 'db', mutable: true },
// Mutable functions, defined in mutableFuncs
logsChannelSecret: { source: 'func', mutable: true },
};
public constructor({ db, configPath }: ConfigOpts) {
super();
this.db = db;
this.configJsonBackend = new ConfigJsonConfigBackend(this.schema, configPath);
this.providerFunctions = createProviderFunctions(this, db);
this.providerFunctions = createProviderFunctions(this);
}
public init(): Bluebird<void> {
@ -245,10 +242,9 @@ class Config extends EventEmitter {
'uuid',
'deviceApiKey',
'apiSecret',
'logsChannelSecret',
'offlineMode',
])
.then(({ uuid, deviceApiKey, apiSecret, logsChannelSecret, offlineMode }) => {
.then(({ uuid, deviceApiKey, apiSecret, offlineMode }) => {
// These fields need to be set regardless
if (uuid == null || apiSecret == null) {
uuid = uuid || this.newUniqueKey();
@ -259,10 +255,8 @@ class Config extends EventEmitter {
if (offlineMode) {
return;
}
if (deviceApiKey == null || logsChannelSecret == null) {
deviceApiKey = deviceApiKey || this.newUniqueKey();
logsChannelSecret = logsChannelSecret || this.newUniqueKey();
return this.set({ deviceApiKey, logsChannelSecret });
if (deviceApiKey == null) {
return this.set({ deviceApiKey: this.newUniqueKey() });
}
});
});

View File

@ -3,7 +3,6 @@ import { Transaction } from 'knex';
import * as _ from 'lodash';
import Config = require('../config');
import DB = require('../db');
import supervisorVersion = require('../lib/supervisor-version');
import * as constants from '../lib/constants';
@ -25,41 +24,8 @@ export interface ConfigProviderFunctions {
[key: string]: ConfigProviderFunction;
}
export function createProviderFunctions(config: Config, db: DB): ConfigProviderFunctions {
export function createProviderFunctions(config: Config): ConfigProviderFunctions {
return {
logsChannelSecret: {
get: () => {
// Return the logsChannelSecret which corresponds to the current backend
return config.get('apiEndpoint')
.then((backend = '') => {
return db.models('logsChannelSecret').select('secret').where({ backend });
})
.then(([ conf ]) => {
if (conf != null) {
return conf.secret;
}
return;
});
},
set: (value: string, tx?: Transaction) => {
// Store the secret with the current backend
return config.get('apiEndpoint')
.then((backend: string) => {
return db.upsertModel(
'logsChannelSecret',
{ backend: backend || '', secret: value },
{ backend: backend || '' },
tx,
);
});
},
remove: () => {
return config.get('apiEndpoint')
.then((backend) => {
return db.models('logsChannelSecret').where({ backend: backend || '' }).del();
});
},
},
version: {
get: () => {
return Bluebird.resolve(supervisorVersion);

View File

@ -159,13 +159,11 @@ module.exports = class DeviceState extends EventEmitter
@config.on 'change', (changedConfig) =>
if changedConfig.loggingEnabled?
@logger.enable(changedConfig.loggingEnabled)
if changedConfig.nativeLogger?
@logger.switchBackend(changedConfig.nativeLogger)
if changedConfig.apiSecret?
@reportCurrentState(api_secret: changedConfig.apiSecret)
@config.getMany([
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant', 'logsChannelSecret',
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant',
'version', 'provisioned', 'apiEndpoint', 'connectivityCheckEnabled', 'legacyAppsPresent'
])
.then (conf) =>
@ -189,7 +187,7 @@ module.exports = class DeviceState extends EventEmitter
provisioning_progress: null
provisioning_state: ''
status: 'Idle'
logs_channel: conf.logsChannelSecret
logs_channel: null
update_failed: false
update_pending: false
update_downloaded: false

View File

@ -1,193 +1,171 @@
url = require 'url'
https = require 'https'
stream = require 'stream'
zlib = require 'zlib'
_ = require 'lodash'
PUBNUB = require 'pubnub'
Promise = require 'bluebird'
es = require 'event-stream'
Lock = require 'rwlock'
{ checkTruthy } = require './lib/validation'
class NativeLoggerBackend
constructor: ->
@logPublishInterval = 1000
@maxLogLinesPerBatch = 10
@maxQueueSize = 100
@maxLineLength = 300
@publishQueue = []
@intervalHandle = null
@publishEnabled = false
ZLIB_TIMEOUT = 100
COOLDOWN_PERIOD = 5 * 1000
KEEPALIVE_TIMEOUT = 60 * 1000
RESPONSE_GRACE_PERIOD = 5 * 1000
MAX_LOG_LENGTH = 10 * 1000
MAX_PENDING_BYTES = 256 * 1024
class LogBackend
constructor: (apiEndpoint, uuid, deviceApiKey) ->
@publishEnabled = true
@offlineMode = false
@shouldStop = false
publishLoop: =>
startTime = process.hrtime()
Promise.try =>
return if @offlineMode or !@publishEnabled or @publishQueue.length is 0
currentBatch = @publishQueue.splice(0, @maxLogLinesPerBatch)
# Silently ignore errors sending logs (just like with pubnub)
@apiBinder.logBatch(currentBatch)
.catchReturn()
.then =>
elapsedTime = process.hrtime(startTime)
elapsedTimeMs = elapsedTime[0] * 1000 + elapsedTime[1] / 1e6
nextDelay = Math.max(@logPublishInterval - elapsedTimeMs, 0)
Promise.delay(nextDelay)
.then =>
if !@shouldStop
@publishLoop()
@_req = null
@_dropCount = 0
@_writable = true
@_gzip = null
_truncateToMaxLength: (message) =>
if message.message.length > @maxLineLength
message = _.clone(message)
message.message = _.truncate(message.message, { length: @maxLineLength, omission: '[...]' })
return message
@_opts = url.parse("#{apiEndpoint}/device/v2/#{uuid}/log-stream")
@_opts.method = 'POST'
@_opts.headers = {
'Authorization': "Bearer #{deviceApiKey}"
'Content-Type': 'application/x-ndjson'
'Content-Encoding': 'gzip'
}
logDependent: (msg, { uuid }) =>
msg = @_truncateToMaxLength(msg)
@apiBinder.logDependent(uuid, msg)
# This stream serves serves as a message buffer during reconnections
# while we unpipe the old, malfunctioning connection and then repipe a
# new one.
@_stream = new stream.PassThrough({
allowHalfOpen: true
# We halve the high watermark because a passthrough stream has two
# buffers, one for the writable and one for the readable side. The
# write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2
})
@_stream.on 'drain', =>
@_writable = true
@_flush()
if @_dropCount > 0
@_write({
message: "Warning: Suppressed #{@_dropCount} message(s) due to high load"
timestamp: Date.now()
isSystem: true
isStdErr: true
})
@_dropCount = 0
log: (message) =>
return if @offlineMode or !@publishEnabled or @publishQueue.length >= @maxQueueSize
if _.isString(message)
message = { message: message }
@_setup = _.throttle(@_setup, COOLDOWN_PERIOD)
@_snooze = _.debounce(@_teardown, KEEPALIVE_TIMEOUT)
message = _.defaults({}, message, {
# Flushing every ZLIB_TIMEOUT hits a balance between compression and
# latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
# whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
@_flush = _.throttle(@_flush, ZLIB_TIMEOUT, leading: false)
_setup: ->
@_req = https.request(@_opts)
# Since we haven't sent the request body yet, and never will,the
# only reason for the server to prematurely respond is to
# communicate an error. So teardown the connection immediately
@_req.on 'response', (res) =>
console.log('LogBackend: server responded with status code:', res.statusCode)
@_teardown()
@_req.on('timeout', => @_teardown())
@_req.on('close', => @_teardown())
@_req.on 'error', (err) =>
console.log('LogBackend: unexpected error:', err)
@_teardown()
# Immediately flush the headers. This gives a chance to the server to
# respond with potential errors such as 401 authentication error
@_req.flushHeaders()
# We want a very low writable high watermark to prevent having many
# chunks stored in the writable queue of @_gzip and have them in
# @_stream instead. This is desirable because once @_gzip.flush() is
# called it will do all pending writes with that flush flag. This is
# not what we want though. If there are 100 items in the queue we want
# to write all of them with Z_NO_FLUSH and only afterwards do a
# Z_SYNC_FLUSH to maximize compression
@_gzip = zlib.createGzip(writableHighWaterMark: 1024)
@_gzip.on('error', => @_teardown())
@_gzip.pipe(@_req)
# Only start piping if there has been no error after the header flush.
# Doing it immediately would potentialy lose logs if it turned out that
# the server is unavailalbe because @_req stream would consume our
# passthrough buffer
@_timeout = setTimeout(=>
@_stream.pipe(@_gzip)
@_flush()
, RESPONSE_GRACE_PERIOD)
_teardown: ->
if @_req isnt null
clearTimeout(@_timeout)
@_req.removeAllListeners()
@_req.on('error', _.noop)
# no-op if pipe hasn't happened yet
@_stream.unpipe(@_gzip)
@_gzip.end()
@_req = null
_flush: ->
@_gzip.flush(zlib.Z_SYNC_FLUSH)
_write: (msg) ->
@_snooze()
if @_req is null
@_setup()
if @_writable
@_writable = @_stream.write(JSON.stringify(msg) + '\n')
@_flush()
else
@_dropCount += 1
log: (msg) ->
if @offlineMode or !@publishEnabled
return
if !_.isObject(msg)
return
msg = _.assign({
timestamp: Date.now()
message: ''
})
}, msg)
message = @_truncateToMaxLength(message)
if !(msg.isSystem or msg.serviceId?)
return
if @publishQueue.length < @maxQueueSize - 1
@publishQueue.push(_.pick(message, [ 'message', 'timestamp', 'isSystem', 'isStderr', 'imageId' ]))
else
@publishQueue.push({
message: 'Warning! Some logs dropped due to high load'
timestamp: Date.now()
isSystem: true
isStderr: true
})
msg.message = _.truncate(msg.message, { length: MAX_LOG_LENGTH, omission: '[...]' })
start: (opts) =>
console.log('Starting native logger')
@apiBinder = opts.apiBinder
@publishEnabled = checkTruthy(opts.enable)
@offlineMode = checkTruthy(opts.offlineMode)
@publishLoop()
return null
stop: =>
console.log('Stopping native logger')
@shouldStop = true
class PubnubLoggerBackend
constructor: ->
@publishQueue = [[]]
@messageIndex = 0
@maxMessageIndex = 9
# Pubnub's message size limit is 32KB (unclear on whether it's KB or actually KiB,
# but we'll be conservative). So we limit a log message to 2 bytes less to account
# for the [ and ] in the array.
@maxLogByteSize = 30000
@publishQueueRemainingBytes = @maxLogByteSize
@logsOverflow = false
@logPublishInterval = 110
doPublish: =>
return if @offlineMode or !@publishEnabled or @publishQueue[0].length is 0
message = @publishQueue.shift()
@pubnub.publish({ @channel, message })
if @publishQueue.length is 0
@publishQueue = [[]]
@publishQueueRemainingBytes = @maxLogByteSize
@messageIndex = Math.max(@messageIndex - 1, 0)
@logsOverflow = false if @messageIndex < @maxMessageIndex
logDependent: (message, { channel }) ->
@pubnub.publish({ channel, message })
log: (msg) =>
return if @offlineMode or !@publishEnabled or (@messageIndex >= @maxMessageIndex and @publishQueueRemainingBytes <= 0)
if _.isString(message)
message = { m: msg }
else
message = {
m: msg.message
t: msg.timestamp
}
if msg.isSystem
message.s = 1
if msg.serviceId?
message.c = msg.serviceId
if msg.isStderr
message.e = 1
_.defaults message,
t: Date.now()
m: ''
msgLength = Buffer.byteLength(encodeURIComponent(JSON.stringify(message)), 'utf8')
return if msgLength > @maxLogByteSize # Unlikely, but we can't allow this
remaining = @publishQueueRemainingBytes - msgLength
if remaining >= 0
@publishQueue[@messageIndex].push(message)
@publishQueueRemainingBytes = remaining
else if @messageIndex < @maxMessageIndex
@messageIndex += 1
@publishQueue[@messageIndex] = [ message ]
@publishQueueRemainingBytes = @maxLogByteSize - msgLength
else if !@logsOverflow
@logsOverflow = true
@messageIndex += 1
@publishQueue[@messageIndex] = [ { m: 'Warning! Some logs dropped due to high load', t: Date.now(), s: 1 } ]
@publishQueueRemainingBytes = 0
start: (opts) =>
console.log('Starting pubnub logger')
@pubnub = PUBNUB.init(opts.pubnub)
@channel = opts.channel
@publishEnabled = checkTruthy(opts.enable)
@offlineMode = checkTruthy(opts.offlineMode)
@intervalHandle = setInterval(@doPublish, @logPublishInterval)
stop: =>
console.log('Stopping pubnub logger')
clearInterval(@intervalHandle)
@_write(msg)
module.exports = class Logger
constructor: ({ @eventTracker }) ->
_lock = new Lock()
@_writeLock = Promise.promisify(_lock.async.writeLock)
@attached = { stdout: {}, stderr: {} }
@opts = {}
@backend = null
init: (opts) =>
Promise.try =>
@opts = opts
@_startBackend()
stop: =>
if @backend?
@backend.stop()
_startBackend: =>
if checkTruthy(@opts.nativeLogger)
@backend = new NativeLoggerBackend()
else
@backend = new PubnubLoggerBackend()
@backend.start(@opts)
switchBackend: (nativeLogger) =>
if checkTruthy(nativeLogger) != checkTruthy(@opts.nativeLogger)
@opts.nativeLogger = nativeLogger
@backend.stop()
@_startBackend()
@backend = new LogBackend(opts.apiEndpoint, opts.uuid, opts.deviceApiKey)
@backend.offlineMode = checkTruthy(opts.offlineMode)
enable: (val) =>
@opts.enable = val
@backend.publishEnabled = checkTruthy(val) ? true
logDependent: (msg, device) =>
@backend.logDependent(msg, device)
msg.uuid = device.uuid
@backend.log(msg)
log: (msg) =>
@backend.log(msg)
@ -195,7 +173,7 @@ module.exports = class Logger
logSystemMessage: (msg, obj, eventName) =>
messageObj = { message: msg, isSystem: true }
if obj?.error?
messageObj.isStderr = true
messageObj.isStdErr = true
@log(messageObj)
@eventTracker.track(eventName ? msg, obj)
@ -222,9 +200,9 @@ module.exports = class Logger
.on 'data', (logLine) =>
space = logLine.indexOf(' ')
if space > 0
msg = { timestamp: logLine.substr(0, space), message: logLine.substr(space + 1), serviceId, imageId }
msg = { timestamp: (new Date(logLine.substr(0, space))).getTime(), message: logLine.substr(space + 1), serviceId, imageId }
if stdoutOrStderr == 'stderr'
msg.isStderr = true
msg.isStdErr = true
@log(msg)
.on 'error', (err) =>
console.error('Error on container logs', err, err.stack)

View File

@ -105,7 +105,6 @@ createProxyvisorRouter = (proxyvisor) ->
deviceId: dev.id
name: dev.name
status: dev.status
logs_channel: dev.logs_channel
}
db.models('dependentDevice').insert(deviceForDB)
.then ->
@ -137,7 +136,7 @@ createProxyvisorRouter = (proxyvisor) ->
.then ([ device ]) ->
return res.status(404).send('Device not found') if !device?
return res.status(410).send('Device deleted') if device.markedForDeletion
proxyvisor.logger.logDependent(m, { uuid, channel: "device-#{device.logs_channel}-logs" })
proxyvisor.logger.logDependent(m, uuid)
res.status(202).send('OK')
.catch (err) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
@ -260,7 +259,6 @@ module.exports = class Proxyvisor
is_online: dev.is_online
name: dev.name
status: dev.status
logs_channel: dev.logs_channel
targetCommit
targetConfig
targetEnvironment

View File

@ -13,15 +13,14 @@ constants = require './lib/constants'
startupConfigFields = [
'uuid'
'listenPort'
'apiEndpoint'
'apiSecret'
'apiTimeout'
'offlineMode'
'deviceApiKey'
'mixpanelToken'
'mixpanelHost'
'logsChannelSecret'
'pubnub'
'loggingEnabled'
'nativeLogger'
]
module.exports = class Supervisor extends EventEmitter
@ -41,7 +40,7 @@ module.exports = class Supervisor extends EventEmitter
init: =>
@db.init()
.tap =>
@config.init() # Ensures uuid, deviceApiKey, apiSecret and logsChannel
@config.init() # Ensures uuid, deviceApiKey, apiSecret
.then =>
@config.getMany(startupConfigFields)
.then (conf) =>
@ -52,10 +51,9 @@ module.exports = class Supervisor extends EventEmitter
@apiBinder.initClient()
.then =>
@logger.init({
nativeLogger: conf.nativeLogger
apiBinder: @apiBinder
pubnub: conf.pubnub
channel: "device-#{conf.logsChannelSecret}-logs"
apiEndpoint: conf.apiEndpoint
uuid: conf.uuid
deviceApiKey: conf.deviceApiKey
offlineMode: conf.offlineMode
enable: conf.loggingEnabled
})