Auto-merge for PR #701 via VersionBot

Implement new log backend
This commit is contained in:
resin-io-versionbot[bot] 2018-07-23 19:11:16 +00:00 committed by GitHub
commit 065f79390f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 264 additions and 329 deletions

View File

@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file
automatically by Versionist. DO NOT EDIT THIS FILE MANUALLY!
This project adheres to [Semantic Versioning](http://semver.org/).
## v7.16.0 - 2018-07-23
* Logger: Add unit tests #701 [Petros Angelatos]
* Logger: Remove pubnub leftovers #701 [Petros Angelatos]
* Logger: Only send logs produced after attaching #701 [Petros Angelatos]
* Logger: Use the new logging backend #701 [Petros Angelatos]
## v7.15.0 - 2018-07-17
* Allow the enabling and disabling of persistent logging via env var #700 [Cameron Diver]

View File

@ -101,8 +101,6 @@ RUN [ "cross-build-end" ]
FROM resin/$ARCH-supervisor-base:v1.2.0
ARG ARCH
ARG VERSION=master
ARG DEFAULT_PUBNUB_PUBLISH_KEY=pub-c-bananas
ARG DEFAULT_PUBNUB_SUBSCRIBE_KEY=sub-c-bananas
ARG DEFAULT_MIXPANEL_TOKEN=bananasbananas
WORKDIR /usr/src/app
@ -117,8 +115,6 @@ ENV CONFIG_MOUNT_POINT=/boot/config.json \
LED_FILE=/dev/null \
SUPERVISOR_IMAGE=resin/$ARCH-supervisor \
VERSION=$VERSION \
DEFAULT_PUBNUB_PUBLISH_KEY=$DEFAULT_PUBNUB_PUBLISH_KEY \
DEFAULT_PUBNUB_SUBSCRIBE_KEY=$DEFAULT_PUBNUB_SUBSCRIBE_KEY \
DEFAULT_MIXPANEL_TOKEN=$DEFAULT_MIXPANEL_TOKEN
HEALTHCHECK --interval=5m --start-period=1m --timeout=30s --retries=3 \

View File

@ -12,7 +12,7 @@
# Variables for build targets:
# * ARCH: amd64/rpi/i386/armv7hf/armel/aarch64 architecture for which to build the supervisor - default: amd64
# * IMAGE: image to build or deploy - default: resin/$(ARCH)-supervisor:latest
# * MIXPANEL_TOKEN, PUBNUB_SUBSCRIBE_KEY, PUBNUB_PUBLISH_KEY: (optional) default pubnub and mixpanel keys to embed in the supervisor image
# * MIXPANEL_TOKEN: (optional) default mixpanel key to embed in the supervisor image
# * DISABLE_CACHE: if set to true, run build with no cache - default: false
# * DOCKER_BUILD_OPTIONS: Additional options for docker build, like --cache-from parameters
#
@ -69,9 +69,7 @@ DOCKER_MAJOR_VERSION:=$(word 1, $(subst ., ,$(DOCKER_VERSION)))
DOCKER_MINOR_VERSION:=$(word 2, $(subst ., ,$(DOCKER_VERSION)))
DOCKER_GE_17_05 := $(shell [ $(DOCKER_MAJOR_VERSION) -gt 17 -o \( $(DOCKER_MAJOR_VERSION) -eq 17 -a $(DOCKER_MINOR_VERSION) -ge 5 \) ] && echo true)
# Default values for Pubnub and Mixpanel keys
PUBNUB_SUBSCRIBE_KEY ?= sub-c-bananas
PUBNUB_PUBLISH_KEY ?= pub-c-bananas
# Default values for Mixpanel key
MIXPANEL_TOKEN ?= bananasbananas
# Default architecture and output image
@ -151,8 +149,6 @@ endif
--no-cache=$(DISABLE_CACHE) \
--build-arg ARCH=$(ARCH) \
--build-arg VERSION=$(shell jq -r .version package.json) \
--build-arg DEFAULT_PUBNUB_PUBLISH_KEY=$(PUBNUB_PUBLISH_KEY) \
--build-arg DEFAULT_PUBNUB_SUBSCRIBE_KEY=$(PUBNUB_SUBSCRIBE_KEY) \
--build-arg DEFAULT_MIXPANEL_TOKEN=$(MIXPANEL_TOKEN) \
-t $(IMAGE) .

View File

@ -32,8 +32,6 @@ The `config.json` file should look something like this:
"apiEndpoint": "https://api.resinstaging.io", /* Endpoint for the resin.io API */
"deltaEndpoint": "https://delta.resinstaging.io", /* Endpoint for the delta server to download Docker binary diffs */
"vpnEndpoint": "vpn.resinstaging.io", /* Endpoint for the resin.io VPN server */
"pubnubSubscribeKey": "sub-c-aaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", /* Subscribe key for Pubnub for logs */
"pubnubPublishKey": "pub-c-aaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", /* Publish key for Pubnub for logs */
"listenPort": 48484, /* Listen port for the supervisor API */
"mixpanelToken": "aaaaaaaaaaaaaaaaaaaaaaaaaa", /* Mixpanel token to report events */
}

View File

@ -10,7 +10,7 @@
# * PUSH_IMAGES
# * CLEANUP
# * ENABLE_TESTS
# * PUBNUB_SUBSCRIBE_KEY, PUBNUB_PUBLISH_KEY, MIXPANEL_TOKEN: default keys to inject in the supervisor image
# * MIXPANEL_TOKEN: default key to inject in the supervisor image
# * EXTRA_TAG: when PUSH_IMAGES is true, additional tag to push to the registries
#
# Builds the supervisor for the architecture defined by $ARCH.
@ -76,8 +76,6 @@ tryPullForCache $NODE_BUILD_CACHE_MASTER
export DOCKER_BUILD_OPTIONS=${CACHE_FROM}
export ARCH
export PUBNUB_PUBLISH_KEY
export PUBNUB_SUBSCRIBE_KEY
export MIXPANEL_TOKEN
make IMAGE=$NODE_BUILD_IMAGE nodebuild

View File

@ -1,7 +1,7 @@
{
"name": "resin-supervisor",
"description": "This is resin.io's Supervisor, a program that runs on IoT devices and has the task of running user Apps (which are Docker containers), and updating them as Resin's API informs it to.",
"version": "7.15.0",
"version": "7.16.0",
"license": "Apache-2.0",
"repository": {
"type": "git",
@ -62,7 +62,6 @@
"node-loader": "^0.6.0",
"null-loader": "^0.1.1",
"pinejs-client": "^2.4.0",
"pubnub": "^3.7.13",
"register-coffee-coverage": "0.0.1",
"request": "^2.51.0",
"resin-lint": "^1.5.7",

View File

@ -252,7 +252,6 @@ module.exports = class APIBinder
belongs_to__user: conf.userId
is_managed_by__device: conf.deviceId
uuid: deviceRegister.generateUniqueKey()
logs_channel: deviceRegister.generateUniqueKey()
registered_at: Math.floor(Date.now() / 1000)
})
@resinApi.post
@ -306,27 +305,6 @@ module.exports = class APIBinder
console.log('Could not pin device to release!')
console.log('Error: ', e)
_sendLogsRequest: (uuid, data) =>
reqBody = _.map(data, (msg) -> _.mapKeys(msg, (v, k) -> _.snakeCase(k)))
@config.get('apiEndpoint')
.then (resinApiEndpoint) =>
endpoint = url.resolve(resinApiEndpoint, "/device/v2/#{uuid}/logs")
requestParams = _.extend
method: 'POST'
url: endpoint
body: reqBody
, @cachedResinApi.passthrough
@cachedResinApi._request(requestParams)
logDependent: (uuid, msg) =>
@_sendLogsRequest(uuid, [ msg ])
logBatch: (messages) =>
@config.get('uuid')
.then (uuid) =>
@_sendLogsRequest(uuid, messages)
# Creates the necessary config vars in the API to match the current device state,
# without overwriting any variables that are already set.
_reportInitialEnv: (apiEndpoint) =>

View File

@ -38,8 +38,6 @@ class Config extends EventEmitter {
registered_at: { source: 'config.json', mutable: true },
applicationId: { source: 'config.json' },
appUpdatePollInterval: { source: 'config.json', mutable: true, default: 60000 },
pubnubSubscribeKey: { source: 'config.json', default: constants.defaultPubnubSubscribeKey },
pubnubPublishKey: { source: 'config.json', default: constants.defaultPubnubPublishKey },
mixpanelToken: { source: 'config.json', default: constants.defaultMixpanelToken },
bootstrapRetryDelay: { source: 'config.json', default: 30000 },
supervisorOfflineMode: { source: 'config.json', default: false },
@ -49,7 +47,6 @@ class Config extends EventEmitter {
version: { source: 'func' },
currentApiKey: { source: 'func' },
offlineMode: { source: 'func' },
pubnub: { source: 'func' },
provisioned: { source: 'func' },
osVersion: { source: 'func' },
osVariant: { source: 'func' },
@ -75,20 +72,16 @@ class Config extends EventEmitter {
deltaVersion: { source: 'db', mutable: true, default: '2' },
lockOverride: { source: 'db', mutable: true, default: 'false' },
legacyAppsPresent: { source: 'db', mutable: true, default: 'false' },
nativeLogger: { source: 'db', mutable: true, default: 'true' },
// a JSON value, which is either null, or { app: number, commit: string }
pinDevice: { source: 'db', mutable: true, default: 'null' },
currentCommit: { source: 'db', mutable: true },
// Mutable functions, defined in mutableFuncs
logsChannelSecret: { source: 'func', mutable: true },
};
public constructor({ db, configPath }: ConfigOpts) {
super();
this.db = db;
this.configJsonBackend = new ConfigJsonConfigBackend(this.schema, configPath);
this.providerFunctions = createProviderFunctions(this, db);
this.providerFunctions = createProviderFunctions(this);
}
public init(): Bluebird<void> {
@ -245,10 +238,9 @@ class Config extends EventEmitter {
'uuid',
'deviceApiKey',
'apiSecret',
'logsChannelSecret',
'offlineMode',
])
.then(({ uuid, deviceApiKey, apiSecret, logsChannelSecret, offlineMode }) => {
.then(({ uuid, deviceApiKey, apiSecret, offlineMode }) => {
// These fields need to be set regardless
if (uuid == null || apiSecret == null) {
uuid = uuid || this.newUniqueKey();
@ -259,10 +251,8 @@ class Config extends EventEmitter {
if (offlineMode) {
return;
}
if (deviceApiKey == null || logsChannelSecret == null) {
deviceApiKey = deviceApiKey || this.newUniqueKey();
logsChannelSecret = logsChannelSecret || this.newUniqueKey();
return this.set({ deviceApiKey, logsChannelSecret });
if (deviceApiKey == null) {
return this.set({ deviceApiKey: this.newUniqueKey() });
}
});
});

View File

@ -3,7 +3,6 @@ import { Transaction } from 'knex';
import * as _ from 'lodash';
import Config = require('../config');
import DB = require('../db');
import supervisorVersion = require('../lib/supervisor-version');
import * as constants from '../lib/constants';
@ -25,41 +24,8 @@ export interface ConfigProviderFunctions {
[key: string]: ConfigProviderFunction;
}
export function createProviderFunctions(config: Config, db: DB): ConfigProviderFunctions {
export function createProviderFunctions(config: Config): ConfigProviderFunctions {
return {
logsChannelSecret: {
get: () => {
// Return the logsChannelSecret which corresponds to the current backend
return config.get('apiEndpoint')
.then((backend = '') => {
return db.models('logsChannelSecret').select('secret').where({ backend });
})
.then(([ conf ]) => {
if (conf != null) {
return conf.secret;
}
return;
});
},
set: (value: string, tx?: Transaction) => {
// Store the secret with the current backend
return config.get('apiEndpoint')
.then((backend: string) => {
return db.upsertModel(
'logsChannelSecret',
{ backend: backend || '', secret: value },
{ backend: backend || '' },
tx,
);
});
},
remove: () => {
return config.get('apiEndpoint')
.then((backend) => {
return db.models('logsChannelSecret').where({ backend: backend || '' }).del();
});
},
},
version: {
get: () => {
return Bluebird.resolve(supervisorVersion);
@ -81,18 +47,6 @@ export function createProviderFunctions(config: Config, db: DB): ConfigProviderF
});
},
},
pubnub: {
get: () => {
return config.getMany([ 'pubnubSubscribeKey', 'pubnubPublishKey' ])
.then(({ pubnubSubscribeKey, pubnubPublishKey }) => {
return {
subscribe_key: pubnubSubscribeKey,
publish_key: pubnubPublishKey,
ssl: true,
};
});
},
},
provisioned: {
get: () => {
return config.getMany([

View File

@ -23,7 +23,6 @@ module.exports = class DeviceConfig
deltaRetryInterval: { envVarName: 'RESIN_SUPERVISOR_DELTA_RETRY_INTERVAL', varType: 'int', defaultValue: '10000' }
deltaVersion: { envVarName: 'RESIN_SUPERVISOR_DELTA_VERSION', varType: 'int', defaultValue: '2' }
lockOverride: { envVarName: 'RESIN_SUPERVISOR_OVERRIDE_LOCK', varType: 'bool', defaultValue: 'false' }
nativeLogger: { envVarName: 'RESIN_SUPERVISOR_NATIVE_LOGGER', varType: 'bool', defaultValue: 'true' }
persistentLogging: { envVarName: 'RESIN_SUPERVISOR_PERSISTENT_LOGGING', varType: 'bool', defaultValue: 'false', rebootRequired: true }
}
@validKeys = [

View File

@ -159,13 +159,11 @@ module.exports = class DeviceState extends EventEmitter
@config.on 'change', (changedConfig) =>
if changedConfig.loggingEnabled?
@logger.enable(changedConfig.loggingEnabled)
if changedConfig.nativeLogger?
@logger.switchBackend(changedConfig.nativeLogger)
if changedConfig.apiSecret?
@reportCurrentState(api_secret: changedConfig.apiSecret)
@config.getMany([
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant', 'logsChannelSecret',
'initialConfigSaved', 'listenPort', 'apiSecret', 'osVersion', 'osVariant',
'version', 'provisioned', 'apiEndpoint', 'connectivityCheckEnabled', 'legacyAppsPresent'
])
.then (conf) =>
@ -189,7 +187,7 @@ module.exports = class DeviceState extends EventEmitter
provisioning_progress: null
provisioning_state: ''
status: 'Idle'
logs_channel: conf.logsChannelSecret
logs_channel: null
update_failed: false
update_pending: false
update_downloaded: false

View File

@ -26,8 +26,6 @@ const constants = {
proxyvisorHookReceiver:
checkString(process.env.RESIN_PROXYVISOR_HOOK_RECEIVER) || 'http://0.0.0.0:1337',
configJsonNonAtomicPath: '/boot/config.json',
defaultPubnubSubscribeKey: process.env.DEFAULT_PUBNUB_SUBSCRIBE_KEY,
defaultPubnubPublishKey: process.env.DEFAULT_PUBNUB_PUBLISH_KEY,
defaultMixpanelToken: process.env.DEFAULT_MIXPANEL_TOKEN,
supervisorNetworkInterface: supervisorNetworkInterface,
allowedInterfaces: [ 'resin-vpn', 'tun0', 'docker0', 'lo', supervisorNetworkInterface ],

View File

@ -1,193 +1,171 @@
url = require 'url'
https = require 'https'
stream = require 'stream'
zlib = require 'zlib'
_ = require 'lodash'
PUBNUB = require 'pubnub'
Promise = require 'bluebird'
es = require 'event-stream'
Lock = require 'rwlock'
{ checkTruthy } = require './lib/validation'
class NativeLoggerBackend
constructor: ->
@logPublishInterval = 1000
@maxLogLinesPerBatch = 10
@maxQueueSize = 100
@maxLineLength = 300
@publishQueue = []
@intervalHandle = null
@publishEnabled = false
ZLIB_TIMEOUT = 100
COOLDOWN_PERIOD = 5 * 1000
KEEPALIVE_TIMEOUT = 60 * 1000
RESPONSE_GRACE_PERIOD = 5 * 1000
MAX_LOG_LENGTH = 10 * 1000
MAX_PENDING_BYTES = 256 * 1024
class LogBackend
constructor: (apiEndpoint, uuid, deviceApiKey) ->
@publishEnabled = true
@offlineMode = false
@shouldStop = false
publishLoop: =>
startTime = process.hrtime()
Promise.try =>
return if @offlineMode or !@publishEnabled or @publishQueue.length is 0
currentBatch = @publishQueue.splice(0, @maxLogLinesPerBatch)
# Silently ignore errors sending logs (just like with pubnub)
@apiBinder.logBatch(currentBatch)
.catchReturn()
.then =>
elapsedTime = process.hrtime(startTime)
elapsedTimeMs = elapsedTime[0] * 1000 + elapsedTime[1] / 1e6
nextDelay = Math.max(@logPublishInterval - elapsedTimeMs, 0)
Promise.delay(nextDelay)
.then =>
if !@shouldStop
@publishLoop()
@_req = null
@_dropCount = 0
@_writable = true
@_gzip = null
_truncateToMaxLength: (message) =>
if message.message.length > @maxLineLength
message = _.clone(message)
message.message = _.truncate(message.message, { length: @maxLineLength, omission: '[...]' })
return message
@_opts = url.parse("#{apiEndpoint}/device/v2/#{uuid}/log-stream")
@_opts.method = 'POST'
@_opts.headers = {
'Authorization': "Bearer #{deviceApiKey}"
'Content-Type': 'application/x-ndjson'
'Content-Encoding': 'gzip'
}
logDependent: (msg, { uuid }) =>
msg = @_truncateToMaxLength(msg)
@apiBinder.logDependent(uuid, msg)
# This stream serves serves as a message buffer during reconnections
# while we unpipe the old, malfunctioning connection and then repipe a
# new one.
@_stream = new stream.PassThrough({
allowHalfOpen: true
# We halve the high watermark because a passthrough stream has two
# buffers, one for the writable and one for the readable side. The
# write() call only returns false when both buffers are full.
highWaterMark: MAX_PENDING_BYTES / 2
})
@_stream.on 'drain', =>
@_writable = true
@_flush()
if @_dropCount > 0
@_write({
message: "Warning: Suppressed #{@_dropCount} message(s) due to high load"
timestamp: Date.now()
isSystem: true
isStdErr: true
})
@_dropCount = 0
log: (message) =>
return if @offlineMode or !@publishEnabled or @publishQueue.length >= @maxQueueSize
if _.isString(message)
message = { message: message }
@_setup = _.throttle(@_setup, COOLDOWN_PERIOD)
@_snooze = _.debounce(@_teardown, KEEPALIVE_TIMEOUT)
message = _.defaults({}, message, {
# Flushing every ZLIB_TIMEOUT hits a balance between compression and
# latency. When ZLIB_TIMEOUT is 0 the compression ratio is around 5x
# whereas when ZLIB_TIMEOUT is infinity the compession ratio is around 10x.
@_flush = _.throttle(@_flush, ZLIB_TIMEOUT, leading: false)
_setup: ->
@_req = https.request(@_opts)
# Since we haven't sent the request body yet, and never will,the
# only reason for the server to prematurely respond is to
# communicate an error. So teardown the connection immediately
@_req.on 'response', (res) =>
console.log('LogBackend: server responded with status code:', res.statusCode)
@_teardown()
@_req.on('timeout', => @_teardown())
@_req.on('close', => @_teardown())
@_req.on 'error', (err) =>
console.log('LogBackend: unexpected error:', err)
@_teardown()
# Immediately flush the headers. This gives a chance to the server to
# respond with potential errors such as 401 authentication error
@_req.flushHeaders()
# We want a very low writable high watermark to prevent having many
# chunks stored in the writable queue of @_gzip and have them in
# @_stream instead. This is desirable because once @_gzip.flush() is
# called it will do all pending writes with that flush flag. This is
# not what we want though. If there are 100 items in the queue we want
# to write all of them with Z_NO_FLUSH and only afterwards do a
# Z_SYNC_FLUSH to maximize compression
@_gzip = zlib.createGzip(writableHighWaterMark: 1024)
@_gzip.on('error', => @_teardown())
@_gzip.pipe(@_req)
# Only start piping if there has been no error after the header flush.
# Doing it immediately would potentialy lose logs if it turned out that
# the server is unavailalbe because @_req stream would consume our
# passthrough buffer
@_timeout = setTimeout(=>
@_stream.pipe(@_gzip)
@_flush()
, RESPONSE_GRACE_PERIOD)
_teardown: ->
if @_req isnt null
clearTimeout(@_timeout)
@_req.removeAllListeners()
@_req.on('error', _.noop)
# no-op if pipe hasn't happened yet
@_stream.unpipe(@_gzip)
@_gzip.end()
@_req = null
_flush: ->
@_gzip.flush(zlib.Z_SYNC_FLUSH)
_write: (msg) ->
@_snooze()
if @_req is null
@_setup()
if @_writable
@_writable = @_stream.write(JSON.stringify(msg) + '\n')
@_flush()
else
@_dropCount += 1
log: (msg) ->
if @offlineMode or !@publishEnabled
return
if !_.isObject(msg)
return
msg = _.assign({
timestamp: Date.now()
message: ''
})
}, msg)
message = @_truncateToMaxLength(message)
if !(msg.isSystem or msg.serviceId?)
return
if @publishQueue.length < @maxQueueSize - 1
@publishQueue.push(_.pick(message, [ 'message', 'timestamp', 'isSystem', 'isStderr', 'imageId' ]))
else
@publishQueue.push({
message: 'Warning! Some logs dropped due to high load'
timestamp: Date.now()
isSystem: true
isStderr: true
})
msg.message = _.truncate(msg.message, { length: MAX_LOG_LENGTH, omission: '[...]' })
start: (opts) =>
console.log('Starting native logger')
@apiBinder = opts.apiBinder
@publishEnabled = checkTruthy(opts.enable)
@offlineMode = checkTruthy(opts.offlineMode)
@publishLoop()
return null
stop: =>
console.log('Stopping native logger')
@shouldStop = true
class PubnubLoggerBackend
constructor: ->
@publishQueue = [[]]
@messageIndex = 0
@maxMessageIndex = 9
# Pubnub's message size limit is 32KB (unclear on whether it's KB or actually KiB,
# but we'll be conservative). So we limit a log message to 2 bytes less to account
# for the [ and ] in the array.
@maxLogByteSize = 30000
@publishQueueRemainingBytes = @maxLogByteSize
@logsOverflow = false
@logPublishInterval = 110
doPublish: =>
return if @offlineMode or !@publishEnabled or @publishQueue[0].length is 0
message = @publishQueue.shift()
@pubnub.publish({ @channel, message })
if @publishQueue.length is 0
@publishQueue = [[]]
@publishQueueRemainingBytes = @maxLogByteSize
@messageIndex = Math.max(@messageIndex - 1, 0)
@logsOverflow = false if @messageIndex < @maxMessageIndex
logDependent: (message, { channel }) ->
@pubnub.publish({ channel, message })
log: (msg) =>
return if @offlineMode or !@publishEnabled or (@messageIndex >= @maxMessageIndex and @publishQueueRemainingBytes <= 0)
if _.isString(message)
message = { m: msg }
else
message = {
m: msg.message
t: msg.timestamp
}
if msg.isSystem
message.s = 1
if msg.serviceId?
message.c = msg.serviceId
if msg.isStderr
message.e = 1
_.defaults message,
t: Date.now()
m: ''
msgLength = Buffer.byteLength(encodeURIComponent(JSON.stringify(message)), 'utf8')
return if msgLength > @maxLogByteSize # Unlikely, but we can't allow this
remaining = @publishQueueRemainingBytes - msgLength
if remaining >= 0
@publishQueue[@messageIndex].push(message)
@publishQueueRemainingBytes = remaining
else if @messageIndex < @maxMessageIndex
@messageIndex += 1
@publishQueue[@messageIndex] = [ message ]
@publishQueueRemainingBytes = @maxLogByteSize - msgLength
else if !@logsOverflow
@logsOverflow = true
@messageIndex += 1
@publishQueue[@messageIndex] = [ { m: 'Warning! Some logs dropped due to high load', t: Date.now(), s: 1 } ]
@publishQueueRemainingBytes = 0
start: (opts) =>
console.log('Starting pubnub logger')
@pubnub = PUBNUB.init(opts.pubnub)
@channel = opts.channel
@publishEnabled = checkTruthy(opts.enable)
@offlineMode = checkTruthy(opts.offlineMode)
@intervalHandle = setInterval(@doPublish, @logPublishInterval)
stop: =>
console.log('Stopping pubnub logger')
clearInterval(@intervalHandle)
@_write(msg)
module.exports = class Logger
constructor: ({ @eventTracker }) ->
_lock = new Lock()
@_writeLock = Promise.promisify(_lock.async.writeLock)
@attached = { stdout: {}, stderr: {} }
@opts = {}
@backend = null
init: (opts) =>
Promise.try =>
@opts = opts
@_startBackend()
stop: =>
if @backend?
@backend.stop()
_startBackend: =>
if checkTruthy(@opts.nativeLogger)
@backend = new NativeLoggerBackend()
else
@backend = new PubnubLoggerBackend()
@backend.start(@opts)
switchBackend: (nativeLogger) =>
if checkTruthy(nativeLogger) != checkTruthy(@opts.nativeLogger)
@opts.nativeLogger = nativeLogger
@backend.stop()
@_startBackend()
@backend = new LogBackend(opts.apiEndpoint, opts.uuid, opts.deviceApiKey)
@backend.offlineMode = checkTruthy(opts.offlineMode)
enable: (val) =>
@opts.enable = val
@backend.publishEnabled = checkTruthy(val) ? true
logDependent: (msg, device) =>
@backend.logDependent(msg, device)
msg.uuid = device.uuid
@backend.log(msg)
log: (msg) =>
@backend.log(msg)
@ -195,7 +173,7 @@ module.exports = class Logger
logSystemMessage: (msg, obj, eventName) =>
messageObj = { message: msg, isSystem: true }
if obj?.error?
messageObj.isStderr = true
messageObj.isStdErr = true
@log(messageObj)
@eventTracker.track(eventName ? msg, obj)
@ -209,7 +187,7 @@ module.exports = class Logger
if stdoutOrStderr not in [ 'stdout', 'stderr' ]
throw new Error("Invalid log selection #{stdoutOrStderr}")
if !@attached[stdoutOrStderr][containerId]
logsOpts = { follow: true, stdout: stdoutOrStderr == 'stdout', stderr: stdoutOrStderr == 'stderr', timestamps: true }
logsOpts = { follow: true, stdout: stdoutOrStderr == 'stdout', stderr: stdoutOrStderr == 'stderr', timestamps: true, since: Math.floor(Date.now() / 1000) }
docker.getContainer(containerId)
.logs(logsOpts)
.then (stream) =>
@ -222,9 +200,9 @@ module.exports = class Logger
.on 'data', (logLine) =>
space = logLine.indexOf(' ')
if space > 0
msg = { timestamp: logLine.substr(0, space), message: logLine.substr(space + 1), serviceId, imageId }
msg = { timestamp: (new Date(logLine.substr(0, space))).getTime(), message: logLine.substr(space + 1), serviceId, imageId }
if stdoutOrStderr == 'stderr'
msg.isStderr = true
msg.isStdErr = true
@log(msg)
.on 'error', (err) =>
console.error('Error on container logs', err, err.stack)

View File

@ -105,7 +105,6 @@ createProxyvisorRouter = (proxyvisor) ->
deviceId: dev.id
name: dev.name
status: dev.status
logs_channel: dev.logs_channel
}
db.models('dependentDevice').insert(deviceForDB)
.then ->
@ -137,7 +136,7 @@ createProxyvisorRouter = (proxyvisor) ->
.then ([ device ]) ->
return res.status(404).send('Device not found') if !device?
return res.status(410).send('Device deleted') if device.markedForDeletion
proxyvisor.logger.logDependent(m, { uuid, channel: "device-#{device.logs_channel}-logs" })
proxyvisor.logger.logDependent(m, uuid)
res.status(202).send('OK')
.catch (err) ->
console.error("Error on #{req.method} #{url.parse(req.url).pathname}", err, err.stack)
@ -260,7 +259,6 @@ module.exports = class Proxyvisor
is_online: dev.is_online
name: dev.name
status: dev.status
logs_channel: dev.logs_channel
targetCommit
targetConfig
targetEnvironment

View File

@ -13,15 +13,14 @@ constants = require './lib/constants'
startupConfigFields = [
'uuid'
'listenPort'
'apiEndpoint'
'apiSecret'
'apiTimeout'
'offlineMode'
'deviceApiKey'
'mixpanelToken'
'mixpanelHost'
'logsChannelSecret'
'pubnub'
'loggingEnabled'
'nativeLogger'
]
module.exports = class Supervisor extends EventEmitter
@ -41,7 +40,7 @@ module.exports = class Supervisor extends EventEmitter
init: =>
@db.init()
.tap =>
@config.init() # Ensures uuid, deviceApiKey, apiSecret and logsChannel
@config.init() # Ensures uuid, deviceApiKey, apiSecret
.then =>
@config.getMany(startupConfigFields)
.then (conf) =>
@ -52,10 +51,9 @@ module.exports = class Supervisor extends EventEmitter
@apiBinder.initClient()
.then =>
@logger.init({
nativeLogger: conf.nativeLogger
apiBinder: @apiBinder
pubnub: conf.pubnub
channel: "device-#{conf.logsChannelSecret}-logs"
apiEndpoint: conf.apiEndpoint
uuid: conf.uuid
deviceApiKey: conf.deviceApiKey
offlineMode: conf.offlineMode
enable: conf.loggingEnabled
})

View File

@ -35,10 +35,6 @@ describe 'Config', ->
promise = @conf.getMany([ 'applicationId', 'apiEndpoint' ])
expect(promise).to.eventually.deep.equal({ applicationId: 78373, apiEndpoint: 'https://api.resin.io' })
it 'provides the correct pubnub config', ->
promise = @conf.get('pubnub')
expect(promise).to.eventually.deep.equal({ subscribe_key: 'foo', publish_key: 'bar', ssl: true })
it 'generates a uuid and stores it in config.json', ->
promise = @conf.get('uuid')
promise2 = fs.readFileAsync('./test/data/config.json').then(JSON.parse).get('uuid')
@ -122,15 +118,6 @@ describe 'Config', ->
@conf = new Config({ @db })
@initialization = @db.init().then =>
@conf.init()
it 'should allow setting of mutable function config options', ->
@conf.set({ logsChannelSecret: 'test' })
.then =>
expect(@conf.get('logsChannelSecret')).to.eventually.equal('test')
it 'should allow removing of mutabe function config options', ->
@conf.remove('logsChannelSecret')
.then =>
expect(@conf.get('logsChannelSecret')).to.eventually.be.undefined
it 'should throw if a non-mutable function provider is set', ->
expect(@conf.set({ version: 'some-version' })).to.be.rejected

View File

@ -42,7 +42,6 @@ testTarget1 = {
'RESIN_SUPERVISOR_DELTA_VERSION': '2'
'RESIN_SUPERVISOR_LOCAL_MODE': 'false'
'RESIN_SUPERVISOR_LOG_CONTROL': 'true'
'RESIN_SUPERVISOR_NATIVE_LOGGER': 'true'
'RESIN_SUPERVISOR_OVERRIDE_LOCK': 'false'
'RESIN_SUPERVISOR_POLL_INTERVAL': '60000'
'RESIN_SUPERVISOR_VPN_CONTROL': 'true'
@ -125,7 +124,6 @@ testTargetWithDefaults2 = {
'RESIN_SUPERVISOR_DELTA_VERSION': '2'
'RESIN_SUPERVISOR_LOCAL_MODE': 'false'
'RESIN_SUPERVISOR_LOG_CONTROL': 'true'
'RESIN_SUPERVISOR_NATIVE_LOGGER': 'true'
'RESIN_SUPERVISOR_OVERRIDE_LOCK': 'false'
'RESIN_SUPERVISOR_POLL_INTERVAL': '60000'
'RESIN_SUPERVISOR_VPN_CONTROL': 'true'

View File

@ -1,39 +1,106 @@
https = require 'https'
stream = require 'stream'
zlib = require 'zlib'
Promise = require 'bluebird'
m = require 'mochainon'
{ expect } = m.chai
{ spy, useFakeTimers } = m.sinon
{ stub } = m.sinon
Logger = require '../src/logger'
describe 'Logger', ->
before ->
@fakeBinder = {
logBatch: spy()
}
beforeEach ->
@_req = new stream.PassThrough()
@_req.flushHeaders = m.sinon.spy()
@_req.end = m.sinon.spy()
@_req.body = ''
@_req
.pipe(zlib.createGunzip())
.on 'data', (chunk) =>
@_req.body += chunk
stub(https, 'request').returns(@_req)
@fakeEventTracker = {
track: spy()
track: m.sinon.spy()
}
@logger = new Logger({ eventTracker: @fakeEventTracker })
@logger.init({ pubnub: {}, channel: 'foo', offlineMode: 'false', enable: 'true', nativeLogger: 'true', apiBinder: @fakeBinder })
after ->
@logger.stop()
@logger = new Logger({eventTracker: @fakeEventTracker})
@logger.init({
apiEndpoint: 'https://example.com'
uuid: 'deadbeef'
deviceApiKey: 'secretkey'
offlineMode: false
})
it 'publishes logs to the resin API by default', (done) ->
theTime = Date.now()
@logger.log(message: 'Hello!', timestamp: theTime)
setTimeout( =>
expect(@fakeBinder.logBatch).to.be.calledWith([ { message: 'Hello!', timestamp: theTime } ])
@fakeBinder.logBatch.reset()
done()
, 1020)
afterEach ->
https.request.restore()
it 'allows logging system messages which are also reported to the eventTracker', (done) ->
clock = useFakeTimers()
clock.tick(10)
@logger.logSystemMessage('Hello there!', { someProp: 'someVal' }, 'Some event name')
it 'waits the grace period before sending any logs', ->
clock = m.sinon.useFakeTimers()
@logger.log({message: 'foobar', serviceId: 15})
clock.tick(4999)
clock.restore()
setTimeout( =>
expect(@fakeBinder.logBatch).to.be.calledWith([ { message: 'Hello there!', timestamp: 10, isSystem: true } ])
Promise.delay(10)
.then =>
expect(@_req.body).to.equal('')
it 'tears down the connection after inactivity', ->
clock = m.sinon.useFakeTimers()
@logger.log({message: 'foobar', serviceId: 15})
clock.tick(61000)
clock.restore()
Promise.delay(10)
.then =>
expect(@_req.end.calledOnce).to.be.true
it 'sends logs as gzipped ndjson', ->
clock = m.sinon.useFakeTimers()
@logger.log({ message: 'foobar', serviceId: 15 })
@logger.log({ timestamp: 1337, message: 'foobar', serviceId: 15 })
@logger.log({ message: 'foobar' }) # shold be ignored
clock.tick(10000)
clock.restore()
expect(https.request.calledOnce).to.be.true
opts = https.request.firstCall.args[0]
expect(opts.href).to.equal('https://example.com/device/v2/deadbeef/log-stream')
expect(opts.method).to.equal('POST')
expect(opts.headers).to.deep.equal({
'Authorization': 'Bearer secretkey'
'Content-Type': 'application/x-ndjson'
'Content-Encoding': 'gzip'
})
# small delay for the streams to propagate data
Promise.delay(10)
.then =>
lines = @_req.body.split('\n')
expect(lines.length).to.equal(3)
expect(lines[2]).to.equal('')
msg = JSON.parse(lines[0])
expect(msg).to.deep.equal({ timestamp: 0, message: 'foobar', serviceId: 15 })
msg = JSON.parse(lines[1])
expect(msg).to.deep.equal({ timestamp: 1337, message: 'foobar', serviceId: 15 })
it 'allows logging system messages which are also reported to the eventTracker', ->
clock = m.sinon.useFakeTimers()
@logger.logSystemMessage('Hello there!', { someProp: 'someVal' }, 'Some event name')
clock.tick(10000)
clock.restore()
Promise.delay(10)
.then =>
expect(@fakeEventTracker.track).to.be.calledWith('Some event name', { someProp: 'someVal' })
done()
, 1020)
lines = @_req.body.split('\n')
expect(lines.length).to.equal(2)
expect(lines[1]).to.equal('')
msg = JSON.parse(lines[0])
expect(msg).to.deep.equal({ message: 'Hello there!', timestamp: 0, isSystem: true })

View File

@ -1 +1 @@
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"http://0.0.0.0:3000","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","pubnubSubscribeKey":"foo","pubnubPublishKey":"bar","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod","supervisorOfflineMode":true}
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"http://0.0.0.0:3000","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod","supervisorOfflineMode":true}

View File

@ -1 +1 @@
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"http://0.0.0.0:3000","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","pubnubSubscribeKey":"foo","pubnubPublishKey":"bar","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod"}
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"http://0.0.0.0:3000","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod"}

View File

@ -1 +1 @@
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"https://api.resin.io","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","pubnubSubscribeKey":"foo","pubnubPublishKey":"bar","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod"}
{"applicationName":"supertestrpi3","applicationId":78373,"deviceType":"raspberrypi3","userId":1001,"username":"someone","appUpdatePollInterval":3000,"listenPort":2345,"vpnPort":443,"apiEndpoint":"https://api.resin.io","vpnEndpoint":"vpn.resin.io","registryEndpoint":"registry2.resin.io","deltaEndpoint":"https://delta.resin.io","mixpanelToken":"baz","apiKey":"boo","version":"2.0.6+rev3.prod"}

View File

@ -42,8 +42,6 @@ runSupervisor() {
-e BOOT_MOUNTPOINT=$BOOT_MOUNTPOINT \
-e API_ENDPOINT=$API_ENDPOINT \
-e REGISTRY_ENDPOINT=$REGISTRY_ENDPOINT \
-e PUBNUB_SUBSCRIBE_KEY=$PUBNUB_SUBSCRIBE_KEY \
-e PUBNUB_PUBLISH_KEY=$PUBNUB_PUBLISH_KEY \
-e MIXPANEL_TOKEN=$MIXPANEL_TOKEN \
-e DELTA_ENDPOINT=$DELTA_ENDPOINT \
-e LED_FILE=${LED_FILE} \
@ -62,4 +60,4 @@ elif [ "$SUPERVISOR_IMAGE_ID" = "$SUPERVISOR_CONTAINER_IMAGE_ID" ]; then
else
# No supervisor container exists or there's a different supervisor image to run
runSupervisor
fi
fi