diff --git a/package-lock.json b/package-lock.json index d1ccb3cc..b46d14f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "balena-supervisor", - "version": "9.2.6", + "version": "9.2.8", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -243,6 +243,15 @@ "@types/node": "*" } }, + "@types/morgan": { + "version": "1.7.35", + "resolved": "https://registry.npmjs.org/@types/morgan/-/morgan-1.7.35.tgz", + "integrity": "sha512-E9qFi0seOkdlQnCTPv54brNfGWeFdRaEhI5tSue4pdx/V+xfxvMETsxXhOEcj1cYL+0n/jcTEmj/jD2gjzCwMg==", + "dev": true, + "requires": { + "@types/express": "*" + } + }, "@types/mz": { "version": "0.0.32", "resolved": "https://registry.npmjs.org/@types/mz/-/mz-0.0.32.tgz", diff --git a/package.json b/package.json index 4f044737..b117e3df 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "@types/lodash": "^4.14.119", "@types/memoizee": "^0.4.2", "@types/mkdirp": "^0.5.2", + "@types/morgan": "^1.7.35", "@types/mz": "0.0.32", "@types/node": "^10.12.17", "@types/request": "^2.48.1", diff --git a/src/api-binder.coffee b/src/api-binder.coffee deleted file mode 100644 index 064ddd75..00000000 --- a/src/api-binder.coffee +++ /dev/null @@ -1,488 +0,0 @@ -Promise = require 'bluebird' -_ = require 'lodash' -url = require 'url' -TypedError = require 'typed-error' -{ PinejsClientRequest } = require 'pinejs-client-request' -deviceRegister = require 'resin-register-device' -express = require 'express' -bodyParser = require 'body-parser' -Lock = require 'rwlock' -path = require 'path' -{ request, requestOpts } = require './lib/request' -{ checkTruthy, checkInt } = require './lib/validation' -{ pathExistsOnHost } = require './lib/fs-utils' -constants = require './lib/constants' - -DuplicateUuidError = (err) -> - _.startsWith(err.message, '"uuid" must be unique') - -ExchangeKeyError = class ExchangeKeyError extends TypedError - -REPORT_SUCCESS_DELAY = 1000 -MAX_REPORT_RETRY_DELAY = 60000 - -INTERNAL_STATE_KEYS = [ - 'update_pending', - 'update_downloaded', - 'update_failed', -] - -createAPIBinderRouter = (apiBinder) -> - router = express.Router() - router.use(bodyParser.urlencoded(extended: true)) - router.use(bodyParser.json()) - router.post '/v1/update', (req, res) -> - apiBinder.eventTracker.track('Update notification') - if apiBinder.readyForUpdates - apiBinder.getAndSetTargetState(req.body.force, true) - .catchReturn() - res.sendStatus(204) - return router - -module.exports = class APIBinder - constructor: ({ @config, @db, @deviceState, @eventTracker }) -> - @balenaApi = null - @cachedBalenaApi = null - @lastReportedState = { local: {}, dependent: {} } - @stateForReport = { local: {}, dependent: {} } - @lastTarget = {} - @lastTargetStateFetch = process.hrtime() - @_targetStateInterval = null - @reportPending = false - @stateReportErrors = 0 - @targetStateFetchErrors = 0 - @router = createAPIBinderRouter(this) - _lock = new Lock() - @_writeLock = Promise.promisify(_lock.async.writeLock) - @readyForUpdates = false - - healthcheck: => - @config.getMany([ 'appUpdatePollInterval', 'unmanaged', 'connectivityCheckEnabled' ]) - .then (conf) => - if conf.unmanaged - return true - timeSinceLastFetch = process.hrtime(@lastTargetStateFetch) - timeSinceLastFetchMs = timeSinceLastFetch[0] * 1000 + timeSinceLastFetch[1] / 1e6 - stateFetchHealthy = timeSinceLastFetchMs < 2 * conf.appUpdatePollInterval - stateReportHealthy = !conf.connectivityCheckEnabled or !@deviceState.connected or @stateReportErrors < 3 - return stateFetchHealthy and stateReportHealthy - - _lockGetTarget: => - @_writeLock('getTarget').disposer (release) -> - release() - - initClient: => - @config.getMany([ 'unmanaged', 'apiEndpoint', 'currentApiKey' ]) - .then ({ unmanaged, apiEndpoint, currentApiKey }) => - if unmanaged - console.log('Unmanaged Mode is set, skipping API client initialization') - return - baseUrl = url.resolve(apiEndpoint, '/v5/') - passthrough = _.cloneDeep(requestOpts) - passthrough.headers ?= {} - passthrough.headers.Authorization = "Bearer #{currentApiKey}" - @balenaApi = new PinejsClientRequest - apiPrefix: baseUrl - passthrough: passthrough - @cachedBalenaApi = @balenaApi.clone({}, cache: {}) - - loadBackupFromMigration: (retryDelay) => - pathExistsOnHost(path.join('mnt/data', constants.migrationBackupFile)) - .then (exists) => - if !exists - return - console.log('Migration backup detected') - @getTargetState() - .then (targetState) => - @deviceState.restoreBackup(targetState) - .catch (err) => - console.log('Error restoring migration backup, retrying: ', err) - Promise.delay(retryDelay) - .then => - @loadBackupFromMigration(retryDelay) - - start: => - @config.getMany([ 'apiEndpoint', 'unmanaged', 'bootstrapRetryDelay' ]) - .then ({ apiEndpoint, unmanaged, bootstrapRetryDelay }) => - if unmanaged - console.log('Unmanaged Mode is set, skipping API binder initialization') - # If we are offline because there is no apiEndpoint, there's a chance - # we've went through a deprovision. We need to set the initialConfigReported - # value to '', to ensure that when we do re-provision, we'll report - # the config and hardward-specific options won't be lost - if !Boolean(apiEndpoint) - return @config.set({ initialConfigReported: '' }) - return - console.log('Ensuring device is provisioned') - @provisionDevice() - .then => - @config.getMany([ 'initialConfigReported', 'apiEndpoint' ]) - .then ({ initialConfigReported, apiEndpoint }) => - - # Either we haven't reported our initial config or we've - # been re-provisioned - if apiEndpoint != initialConfigReported - console.log('Reporting initial configuration') - @reportInitialConfig(apiEndpoint, bootstrapRetryDelay) - .then => - console.log('Starting current state report') - @startCurrentStateReport() - .then => - @loadBackupFromMigration(bootstrapRetryDelay) - .then => - @readyForUpdates = true - console.log('Starting target state poll') - @startTargetStatePoll() - return null - - fetchDevice: (uuid, apiKey, timeout) => - reqOpts = { - resource: 'device' - options: - $filter: - uuid: uuid - passthrough: - headers: Authorization: "Bearer #{apiKey}" - } - @balenaApi.get(reqOpts) - .get(0) - .catchReturn(null) - .timeout(timeout) - - _exchangeKeyAndGetDevice: (opts) -> - Promise.try => - if !opts? - @config.get('provisioningOptions') - .then (conf) -> - opts = conf - .then => - # If we have an existing device key we first check if it's valid, because if it is we can just use that - if opts.deviceApiKey? - @fetchDevice(opts.uuid, opts.deviceApiKey, opts.apiTimeout) - .then (device) => - if device? - return device - # If it's not valid/doesn't exist then we try to use the user/provisioning api key for the exchange - @fetchDevice(opts.uuid, opts.provisioningApiKey, opts.apiTimeout) - .tap (device) -> - if not device? - throw new ExchangeKeyError("Couldn't fetch device with provisioning key") - # We found the device, we can try to register a working device key for it - request.postAsync("#{opts.apiEndpoint}/api-key/device/#{device.id}/device-key", { - json: true - body: - apiKey: opts.deviceApiKey - headers: - Authorization: "Bearer #{opts.provisioningApiKey}" - }) - .spread (res, body) -> - if res.statusCode != 200 - throw new ExchangeKeyError("Couldn't register device key with provisioning key") - .timeout(opts.apiTimeout) - - _exchangeKeyAndGetDeviceOrRegenerate: (opts) => - @_exchangeKeyAndGetDevice(opts) - .tap -> - console.log('Key exchange succeeded, all good') - .tapCatch ExchangeKeyError, (err) => - # If it fails we just have to reregister as a provisioning key doesn't have the ability to change existing devices - console.log('Exchanging key failed, having to reregister') - @config.regenerateRegistrationFields() - - _provision: => - @config.get('provisioningOptions') - .then (opts) => - if opts.registered_at? and opts.deviceId? and !opts.provisioningApiKey? - return - Promise.try => - if opts.registered_at? and !opts.deviceId? - console.log('Device is registered but no device id available, attempting key exchange') - @_exchangeKeyAndGetDeviceOrRegenerate(opts) - else if !opts.registered_at? - console.log('New device detected. Provisioning...') - deviceRegister.register(opts) - .timeout(opts.apiTimeout) - .catch DuplicateUuidError, => - console.log('UUID already registered, trying a key exchange') - @_exchangeKeyAndGetDeviceOrRegenerate(opts) - .tap -> - opts.registered_at = Date.now() - else if opts.provisioningApiKey? - console.log('Device is registered but we still have an apiKey, attempting key exchange') - @_exchangeKeyAndGetDevice(opts) - .then ({ id }) => - @balenaApi.passthrough.headers.Authorization = "Bearer #{opts.deviceApiKey}" - configToUpdate = { - registered_at: opts.registered_at - deviceId: id - apiKey: null - } - @config.set(configToUpdate) - .then => - @eventTracker.track('Device bootstrap success') - # Check if we need to pin the device, regardless of if we provisioned - .then => - @config.get('pinDevice') - .then(JSON.parse) - .tapCatch -> - console.log('Warning: Malformed pinDevice value in supervisor database') - .catchReturn(null) - .then (pinValue) => - if pinValue? - if !pinValue.app? or !pinValue.commit? - console.log("Malformed pinDevice fields in supervisor database: #{pinValue}") - return - console.log('Attempting to pin device to preloaded release...') - @pinDevice(pinValue) - - _provisionOrRetry: (retryDelay) => - @eventTracker.track('Device bootstrap') - @_provision() - .catch (err) => - @eventTracker.track('Device bootstrap failed, retrying', { error: err, delay: retryDelay }) - Promise.delay(retryDelay).then => - @_provisionOrRetry(retryDelay) - - provisionDevice: => - if !@balenaApi? - throw new Error('Trying to provision device without initializing API client') - @config.getMany([ - 'provisioned' - 'bootstrapRetryDelay' - 'apiKey' - 'pinDevice' - ]) - .tap (conf) => - if !conf.provisioned or conf.apiKey? or conf.pinDevice? - @_provisionOrRetry(conf.bootstrapRetryDelay) - - provisionDependentDevice: (device) => - @config.getMany([ - 'unmanaged' - 'provisioned' - 'apiTimeout' - 'userId' - 'deviceId' - ]) - .then (conf) => - if conf.unmanaged - throw new Error('Cannot provision dependent device in unmanaged mode') - if !conf.provisioned - throw new Error('Device must be provisioned to provision a dependent device') - # TODO: when API supports it as per https://github.com/resin-io/hq/pull/949 remove userId - _.defaults(device, { - belongs_to__user: conf.userId - is_managed_by__device: conf.deviceId - uuid: deviceRegister.generateUniqueKey() - registered_at: Math.floor(Date.now() / 1000) - }) - @balenaApi.post - resource: 'device' - body: device - .timeout(conf.apiTimeout) - - patchDevice: (id, updatedFields) => - @config.getMany([ - 'unmanaged' - 'provisioned' - 'apiTimeout' - ]) - .then (conf) => - if conf.unmanaged - throw new Error('Cannot update dependent device in unmanaged mode') - if !conf.provisioned - throw new Error('Device must be provisioned to update a dependent device') - @balenaApi.patch - resource: 'device' - id: id - body: updatedFields - .timeout(conf.apiTimeout) - - pinDevice: ({ app, commit }) => - @config.get('deviceId') - .then (deviceId) => - @balenaApi.get - resource: 'release' - options: - $filter: - belongs_to__application: app - commit: commit - status: 'success' - $select: 'id' - .then (release) => - releaseId = _.get(release, '[0].id') - if !releaseId? - throw new Error('Cannot continue pinning preloaded device! No release found!') - @balenaApi.patch - resource: 'device' - id: deviceId - body: - should_be_running__release: releaseId - .then => - # Set the config value for pinDevice to null, so that we know the - # task has been completed - @config.remove('pinDevice') - .tapCatch (e) -> - console.log('Could not pin device to release!') - console.log('Error: ', e) - - # Creates the necessary config vars in the API to match the current device state, - # without overwriting any variables that are already set. - _reportInitialEnv: (apiEndpoint) => - defaultConfig = @deviceState.deviceConfig.getDefaults() - Promise.join( - @deviceState.getCurrentForComparison() - @getTargetState().then (targetState) => - @deviceState.deviceConfig.formatConfigKeys(targetState.local.config) - @config.get('deviceId') - (currentState, targetConfig, deviceId) => - currentConfig = currentState.local.config - Promise.mapSeries _.toPairs(currentConfig), ([ key, value ]) => - # We want to disable local mode when joining a cloud - if key == 'SUPERVISOR_LOCAL_MODE' - value = 'false' - # We never want to disable VPN if, for instance, it failed to start so far - if key == 'SUPERVISOR_VPN_CONTROL' - value = 'true' - if !targetConfig[key]? and value != defaultConfig[key] - # At some point the namespace in the API should change to BALENA_ - # but for now we only have RESIN_ - envVar = { - value - device: deviceId - name: 'RESIN_' + key - } - @balenaApi.post - resource: 'device_config_variable' - body: envVar - ) - .then => - @config.set({ initialConfigReported: apiEndpoint }) - - reportInitialConfig: (apiEndpoint, retryDelay) => - @_reportInitialEnv(apiEndpoint) - .catch (err) => - console.error('Error reporting initial configuration, will retry', err) - Promise.delay(retryDelay) - .then => - @reportInitialConfig(apiEndpoint, retryDelay) - - getTargetState: => - @config.getMany([ 'uuid', 'apiEndpoint', 'apiTimeout' ]) - .then ({ uuid, apiEndpoint, apiTimeout }) => - endpoint = url.resolve(apiEndpoint, "/device/v2/#{uuid}/state") - - requestParams = _.extend - method: 'GET' - url: "#{endpoint}" - , @cachedBalenaApi.passthrough - - @cachedBalenaApi._request(requestParams) - .timeout(apiTimeout) - - # Get target state from API, set it on @deviceState and trigger a state application - getAndSetTargetState: (force, isFromAPI = false) => - Promise.using @_lockGetTarget(), => - @getTargetState() - .then (targetState) => - if isFromAPI or !_.isEqual(targetState, @lastTarget) - @deviceState.setTarget(targetState) - .then => - @lastTarget = _.cloneDeep(targetState) - @deviceState.triggerApplyTarget({ force }) - .tapCatch (err) -> - console.error("Failed to get target state for device: #{err}") - .finally => - @lastTargetStateFetch = process.hrtime() - - _pollTargetState: => - @getAndSetTargetState() - .then => - @targetStateFetchErrors = 0 - @config.get('appUpdatePollInterval') - .catch => - @targetStateFetchErrors += 1 - @config.get('appUpdatePollInterval') - .then (appUpdatePollInterval) => - Math.min(appUpdatePollInterval, 15000 * 2 ** (@targetStateFetchErrors - 1)) - .then(checkInt) - .then(Promise.delay) - .then(@_pollTargetState) - - startTargetStatePoll: => - Promise.try => - if !@balenaApi? - throw new Error('Trying to start poll without initializing API client') - @_pollTargetState() - return null - - _getStateDiff: => - diff = { - local: _(@stateForReport.local) - .omitBy((val, key) => _.isEqual(@lastReportedState.local[key], val)) - .omit(INTERNAL_STATE_KEYS) - .value() - dependent: _(@stateForReport.dependent) - .omitBy((val, key) => _.isEqual(@lastReportedState.dependent[key], val)) - .omit(INTERNAL_STATE_KEYS) - .value() - } - return _.pickBy(diff, _.negate(_.isEmpty)) - - _sendReportPatch: (stateDiff, conf) => - endpoint = url.resolve(conf.apiEndpoint, "/device/v2/#{conf.uuid}/state") - requestParams = _.extend - method: 'PATCH' - url: "#{endpoint}" - body: stateDiff - , @cachedBalenaApi.passthrough - - @cachedBalenaApi._request(requestParams) - - _report: => - @config.getMany([ 'deviceId', 'apiTimeout', 'apiEndpoint', 'uuid', 'localMode' ]) - .then (conf) => - return if checkTruthy(conf.localMode) - stateDiff = @_getStateDiff() - if _.size(stateDiff) is 0 - return - @_sendReportPatch(stateDiff, conf) - .timeout(conf.apiTimeout) - .then => - @stateReportErrors = 0 - _.assign(@lastReportedState.local, stateDiff.local) - _.assign(@lastReportedState.dependent, stateDiff.dependent) - - _reportCurrentState: => - @reportPending = true - @deviceState.getStatus() - .then (currentDeviceState) => - _.assign(@stateForReport.local, currentDeviceState.local) - _.assign(@stateForReport.dependent, currentDeviceState.dependent) - stateDiff = @_getStateDiff() - if _.size(stateDiff) is 0 - @reportPending = false - return - @_report() - .delay(REPORT_SUCCESS_DELAY) - .then => - @_reportCurrentState() - .catch (err) => - @stateReportErrors += 1 - @eventTracker.track('Device state report failure', { error: err }) - delay = Math.min((2 ** @stateReportErrors) * 500, MAX_REPORT_RETRY_DELAY) - Promise.delay(delay) - .then => - @_reportCurrentState() - return null - - startCurrentStateReport: => - if !@balenaApi? - throw new Error('Trying to start state reporting without initializing API client') - # patch to the device(id) endpoint - @deviceState.on 'change', => - if !@reportPending - # A latency of 100 ms should be acceptable and - # allows avoiding catching docker at weird states - @_reportCurrentState() - @_reportCurrentState() diff --git a/src/api-binder.ts b/src/api-binder.ts new file mode 100644 index 00000000..7a728476 --- /dev/null +++ b/src/api-binder.ts @@ -0,0 +1,889 @@ +import * as Bluebird from 'bluebird'; +import * as bodyParser from 'body-parser'; +import * as express from 'express'; +import * as _ from 'lodash'; +import * as Path from 'path'; +import { PinejsClientRequest } from 'pinejs-client-request'; +import * as deviceRegister from 'resin-register-device'; +import * as url from 'url'; + +import Config from './config'; +import Database from './db'; +import DeviceConfig from './device-config'; +import { EventTracker } from './event-tracker'; + +import * as constants from './lib/constants'; +import { + DuplicateUuidError, + ExchangeKeyError, + InternalInconsistencyError, +} from './lib/errors'; +import { pathExistsOnHost } from './lib/fs-utils'; +import { request, requestOpts } from './lib/request'; +import { ConfigValue } from './lib/types'; +import { writeLock } from './lib/update-lock'; +import { checkInt, checkTruthy } from './lib/validation'; +import { DeviceApplicationState } from './types/state'; + +const REPORT_SUCCESS_DELAY = 1000; +const MAX_REPORT_RETRY_DELAY = 60000; + +const INTERNAL_STATE_KEYS = [ + 'update_pending', + 'update_downloaded', + 'update_failed', +]; + +interface APIBinderConstructOpts { + config: Config; + // FIXME: Remove this + db: Database; + // TODO: Typings + deviceState: { + deviceConfig: DeviceConfig; + [key: string]: any; + }; + eventTracker: EventTracker; +} + +interface KeyExchangeOpts { + uuid: ConfigValue; + deviceApiKey: ConfigValue; + apiTimeout: ConfigValue; + apiEndpoint: ConfigValue; + provisioningApiKey: ConfigValue; +} + +interface Device { + id: string; + + [key: string]: unknown; +} + +interface DevicePinInfo { + app: number; + commit: string; +} + +export class APIBinder { + public router: express.Router; + + private config: Config; + private deviceState: { + deviceConfig: DeviceConfig; + [key: string]: any; + }; + private eventTracker: EventTracker; + + private balenaApi: PinejsClientRequest | null = null; + private cachedBalenaApi: PinejsClientRequest | null = null; + private lastReportedState: DeviceApplicationState = { + local: {}, + dependent: {}, + }; + private stateForReport: DeviceApplicationState = { + local: {}, + dependent: {}, + }; + private lastTarget: DeviceApplicationState = {}; + private lastTargetStateFetch = process.hrtime(); + private reportPending = false; + private stateReportErrors = 0; + private targetStateFetchErrors = 0; + private readyForUpdates = false; + + public constructor({ + config, + deviceState, + eventTracker, + }: APIBinderConstructOpts) { + this.config = config; + this.deviceState = deviceState; + this.eventTracker = eventTracker; + + this.router = this.createAPIBinderRouter(this); + } + + public async healthcheck() { + const { + appUpdatePollInterval, + unmanaged, + connectivityCheckEnabled, + } = await this.config.getMany([ + 'appUpdatePollInterval', + 'unmanaged', + 'connectivityCheckEnabled', + ]); + + if (unmanaged) { + return true; + } + + if (appUpdatePollInterval == null) { + return false; + } + + const timeSinceLastFetch = process.hrtime(this.lastTargetStateFetch); + const timeSinceLastFetchMs = + timeSinceLastFetch[0] * 1000 + timeSinceLastFetch[1] / 1e6; + const stateFetchHealthy = + timeSinceLastFetchMs < 2 * (appUpdatePollInterval as number); + const stateReportHealthy = + !connectivityCheckEnabled || + !this.deviceState.connected || + this.stateReportErrors < 3; + + return stateFetchHealthy && stateReportHealthy; + } + + public async initClient() { + const { unmanaged, apiEndpoint, currentApiKey } = await this.config.getMany( + ['unmanaged', 'apiEndpoint', 'currentApiKey'], + ); + + if (unmanaged) { + console.log('Unmanaged mode is set, skipping API client initialization'); + return; + } + + const baseUrl = url.resolve(apiEndpoint as string, '/v5/'); + const passthrough = _.cloneDeep(requestOpts); + passthrough.headers = + passthrough.headers != null ? passthrough.headers : {}; + passthrough.headers.Authorization = `Bearer ${currentApiKey}`; + this.balenaApi = new PinejsClientRequest({ + apiPrefix: baseUrl, + passthrough, + }); + this.cachedBalenaApi = this.balenaApi.clone({}, { cache: {} }); + } + + public async loadBackupFromMigration(retryDelay: number): Promise { + try { + const exists = await pathExistsOnHost( + Path.join('mnt/data', constants.migrationBackupFile), + ); + if (!exists) { + return; + } + console.log('Migration backup detected'); + const targetState = await this.getTargetState(); + await this.deviceState.restoreBackup(targetState); + } catch (err) { + console.log('Error restoring migration backup, retrying: ', err); + + await Bluebird.delay(retryDelay); + return this.loadBackupFromMigration(retryDelay); + } + } + + public async start() { + const conf = await this.config.getMany([ + 'apiEndpoint', + 'unmanaged', + 'bootstrapRetryDelay', + ]); + let { apiEndpoint } = conf; + const { unmanaged, bootstrapRetryDelay } = conf; + + if (unmanaged) { + console.log('Unmanaged mode is set, skipping API binder initialization'); + // If we are offline because there is no apiEndpoint, there's a chance + // we've went through a deprovision. We need to set the initialConfigReported + // value to '', to ensure that when we do re-provision, we'll report + // the config and hardward-specific options won't be lost + if (!apiEndpoint) { + await this.config.set({ initialConfigReported: '' }); + } + return; + } + + console.log('Ensuring device is provisioned'); + await this.provisionDevice(); + const conf2 = await this.config.getMany([ + 'initialConfigReported', + 'apiEndpoint', + ]); + apiEndpoint = conf2.apiEndpoint; + const { initialConfigReported } = conf2; + + // Either we haven't reported our initial config or we've been re-provisioned + if (apiEndpoint !== initialConfigReported) { + console.log('Reporting initial configuration'); + await this.reportInitialConfig( + apiEndpoint as string, + bootstrapRetryDelay as number, + ); + } + + console.log('Starting current state report'); + await this.startCurrentStateReport(); + + await this.loadBackupFromMigration(bootstrapRetryDelay as number); + + this.readyForUpdates = true; + console.log('Starting target state poll'); + this.startTargetStatePoll(); + } + + public async fetchDevice( + uuid: string, + apiKey: string, + timeout: number, + ): Promise { + const reqOpts = { + resource: 'device', + options: { + $filter: { + uuid, + }, + }, + passthrough: { + headers: { + Authorization: `Bearer ${apiKey}`, + }, + }, + }; + + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'fetchDevice called without an initialized API client', + ); + } + + try { + const res = (await this.balenaApi + .get(reqOpts) + .timeout(timeout)) as Device[]; + return res[0]; + } catch (e) { + return null; + } + } + + public async patchDevice(id: number, updatedFields: Dictionary) { + const conf = await this.config.getMany([ + 'unmanaged', + 'provisioned', + 'apiTimeout', + ]); + + if (conf.unmanaged) { + throw new Error('Cannot update device in unmanaged mode'); + } + + if (!conf.provisioned) { + throw new Error('DEvice must be provisioned to update a device'); + } + + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to patch device without an API client', + ); + } + + return this.balenaApi + .patch({ + resource: 'device', + id, + body: updatedFields, + }) + .timeout(conf.apiTimeout as number); + } + + public async provisionDependentDevice(device: Device): Promise { + const conf = await this.config.getMany([ + 'unmanaged', + 'provisioned', + 'apiTimeout', + 'userId', + 'deviceId', + ]); + + if (conf.unmanaged) { + throw new Error('Cannot provision dependent device in unmanaged mode'); + } + if (!conf.provisioned) { + throw new Error( + 'Device must be provisioned to provision a dependent device', + ); + } + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to provision a dependent device without an API client', + ); + } + + // TODO: When API supports it as per https://github.com/resin-io/hq/pull/949 remove userId + _.defaults(device, { + belongs_to__user: conf.userId, + is_managed_by__device: conf.deviceId, + uuid: deviceRegister.generateUniqueKey(), + registered_at: Math.floor(Date.now() / 1000), + }); + + return (await this.balenaApi + .post({ resource: 'device', body: device }) + // TODO: Remove the `as number` when we fix the config typings + .timeout(conf.apiTimeout as number)) as Device; + } + + public async getTargetState(): Promise { + const { uuid, apiEndpoint, apiTimeout } = await this.config.getMany([ + 'uuid', + 'apiEndpoint', + 'apiTimeout', + ]); + + if (!_.isString(apiEndpoint)) { + throw new InternalInconsistencyError( + 'Non-string apiEndpoint passed to ApiBinder.getTargetState', + ); + } + if (this.cachedBalenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to get target state without an API client', + ); + } + + const endpoint = url.resolve(apiEndpoint, `/device/v2/${uuid}/state`); + const requestParams = _.extend( + { method: 'GET', url: endpoint }, + this.cachedBalenaApi.passthrough, + ); + + return await this.cachedBalenaApi + ._request(requestParams) + .timeout(apiTimeout as number); + } + + // TODO: Once 100% typescript, change this to a native promise + public startTargetStatePoll(): Bluebird { + return Bluebird.try(() => { + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Trying to start poll without initializing API client', + ); + } + this.pollTargetState(); + return null; + }); + } + + public startCurrentStateReport() { + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Trying to start state reporting without initializing API client', + ); + } + this.deviceState.on('change', () => { + if (!this.reportPending) { + // A latency of 100ms should be acceptable and + // allows avoiding catching docker at weird states + this.reportCurrentState(); + } + }); + return this.reportCurrentState(); + } + + private getStateDiff(): DeviceApplicationState { + const lastReportedLocal = this.lastReportedState.local; + const lastReportedDependent = this.lastReportedState.dependent; + if (lastReportedLocal == null || lastReportedDependent == null) { + throw new InternalInconsistencyError( + `No local or dependent component of lastReportedLocal in ApiBinder.getStateDiff: ${JSON.stringify( + this.lastReportedState, + )}`, + ); + } + + const diff = { + local: _(this.stateForReport.local) + .omitBy((val, key: keyof DeviceApplicationState['local']) => + _.isEqual(lastReportedLocal[key], val), + ) + .omit(INTERNAL_STATE_KEYS) + .value(), + dependent: _(this.stateForReport.dependent) + .omitBy((val, key: keyof DeviceApplicationState['dependent']) => + _.isEqual(lastReportedDependent[key], val), + ) + .omit(INTERNAL_STATE_KEYS) + .value(), + }; + + return _.omitBy(diff, _.isEmpty); + } + + private async sendReportPatch( + stateDiff: DeviceApplicationState, + conf: { apiEndpoint: string; uuid: string }, + ) { + if (this.cachedBalenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to send report patch without an API client', + ); + } + + const endpoint = url.resolve( + conf.apiEndpoint, + `/device/v2/${conf.uuid}/state`, + ); + + const requestParams = _.extend( + { + method: 'PATCH', + url: endpoint, + body: stateDiff, + }, + this.cachedBalenaApi.passthrough, + ); + + await this.cachedBalenaApi._request(requestParams); + } + + private async report() { + const conf = await this.config.getMany([ + 'deviceId', + 'apiTimeout', + 'apiEndpoint', + 'uuid', + 'localMode', + ]); + + if (checkTruthy(conf.localMode || false)) { + return; + } + + const stateDiff = this.getStateDiff(); + if (_.size(stateDiff) === 0) { + return 0; + } + + await Bluebird.resolve( + this.sendReportPatch(stateDiff, conf as { + uuid: string; + apiEndpoint: string; + }), + ).timeout(conf.apiTimeout as number); + + this.stateReportErrors = 0; + _.assign(this.lastReportedState.local, stateDiff.local); + _.assign(this.lastReportedState.dependent, stateDiff.dependent); + } + + private reportCurrentState(): null { + (async () => { + this.reportPending = true; + try { + const currentDeviceState = await this.deviceState.getStatus(); + _.assign(this.stateForReport.local, currentDeviceState.local); + _.assign(this.stateForReport.dependent, currentDeviceState.dependent); + + const stateDiff = this.getStateDiff(); + if (_.size(stateDiff) === 0) { + this.reportPending = false; + return null; + } + + await this.report(); + await Bluebird.delay(REPORT_SUCCESS_DELAY); + await this.reportCurrentState(); + } catch (e) { + this.eventTracker.track('Device state report failure', { error: e }); + const delay = Math.min( + 2 ** this.stateReportErrors * 500, + MAX_REPORT_RETRY_DELAY, + ); + + ++this.stateReportErrors; + await Bluebird.delay(delay); + await this.reportCurrentState(); + } + })(); + return null; + } + + private getAndSetTargetState(force: boolean, isFromApi = false) { + return Bluebird.using(this.lockGetTarget(), async () => { + const targetState = await this.getTargetState(); + if (isFromApi || !_.isEqual(targetState, this.lastTarget)) { + await this.deviceState.setTarget(targetState); + this.lastTarget = _.cloneDeep(targetState); + this.deviceState.triggerApplyTarget({ force }); + } + }) + .tapCatch(err => { + console.error(`Failed to get target state for device: ${err}`); + }) + .finally(() => { + this.lastTargetStateFetch = process.hrtime(); + }); + } + + private async pollTargetState(): Promise { + // TODO: Remove the checkInt here with the config changes + let pollInterval = checkInt((await this.config.get( + 'appUpdatePollInterval', + )) as string); + if (!_.isNumber(pollInterval)) { + throw new InternalInconsistencyError( + 'appUpdatePollInterval not a number in ApiBinder.pollTargetState', + ); + } + + try { + await this.getAndSetTargetState(false); + this.targetStateFetchErrors = 0; + } catch (e) { + pollInterval = Math.min( + pollInterval, + 15000 * 2 ** this.targetStateFetchErrors, + ); + ++this.targetStateFetchErrors; + } + + return Bluebird.delay(pollInterval).then(this.pollTargetState); + } + + private async pinDevice({ app, commit }: DevicePinInfo) { + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to pin device without an API client', + ); + } + + try { + const deviceId = await this.config.get('deviceId'); + const release = await this.balenaApi.get({ + resource: 'release', + options: { + $filter: { + belongs_to__application: app, + commit, + status: 'success', + }, + $select: 'id', + }, + }); + + const releaseId = _.get(release, '[0].id'); + if (releaseId == null) { + throw new Error( + 'Cannot continue pinning preloaded device! No release found!', + ); + } + + await this.balenaApi.patch({ + resource: 'device', + id: deviceId as number, + body: { + should_be_running__release: releaseId, + }, + }); + + // Set the config value for pinDevice to null, so that we know the + // task has been completed + await this.config.remove('pinDevice'); + } catch (e) { + console.log('Could not pin device to release!'); + console.log('Error: ', e); + throw e; + } + } + + // Creates the necessary config vars in the API to match the current device state, + // without overwriting any variables that are already set. + private async reportInitialEnv(apiEndpoint: string) { + if (this.balenaApi == null) { + throw new InternalInconsistencyError( + 'Attempt to report initial environment without an API client', + ); + } + + const targetConfigUnformatted = _.get( + await this.getTargetState(), + 'local.config', + ); + if (targetConfigUnformatted == null) { + throw new InternalInconsistencyError( + 'Attempt to report initial state with malformed target state', + ); + } + + const defaultConfig = this.deviceState.deviceConfig.getDefaults(); + + const currentState = await this.deviceState.getCurrentForComparison(); + const targetConfig = await this.deviceState.deviceConfig.formatConfigKeys( + targetConfigUnformatted, + ); + const deviceId = await this.config.get('deviceId'); + + const currentConfig: Dictionary = currentState.local.config; + for (const [key, value] of _.toPairs(currentConfig)) { + let varValue = value; + // We want to disable local mode when joining a cloud + if (key === 'SUPERVISOR_LOCAL_MODE') { + varValue = 'false'; + } + // We never want to disable VPN if, for instance, it failed to start so far + if (key === 'SUPERVISOR_VPN_CONTROL') { + varValue = 'true'; + } + + if (targetConfig[key] == null && value !== defaultConfig[key]) { + const envVar = { + varValue, + device: deviceId, + name: 'RESIN_' + key, + }; + await this.balenaApi.post({ + resource: 'device_config_variable', + body: envVar, + }); + } + } + + await this.config.set({ initialConfigReported: apiEndpoint }); + } + + private async reportInitialConfig( + apiEndpoint: string, + retryDelay: number, + ): Promise { + try { + await this.reportInitialEnv(apiEndpoint); + } catch (err) { + console.error('Error reporting initial configuration, will retry', err); + await Bluebird.delay(retryDelay); + this.reportInitialConfig(apiEndpoint, retryDelay); + } + } + + private async exchangeKeyAndGetDevice( + opts?: KeyExchangeOpts, + ): Promise { + if (opts == null) { + // FIXME: This casting shouldn't be necessary and stems from the + // meta-option provioningOptions not returning a ConfigValue + opts = ((await this.config.get( + 'provisioningOptions', + )) as any) as KeyExchangeOpts; + } + + const uuid = opts.uuid as string; + const apiTimeout = opts.apiTimeout as number; + if (!(uuid && apiTimeout)) { + throw new InternalInconsistencyError( + 'UUID and apiTimeout should be defined in exchangeKeyAndGetDevice', + ); + } + + // If we have an existing device key we first check if it's + // valid, becaise of ot os we can just use that + if (opts.deviceApiKey != null) { + const device = await this.fetchDevice( + uuid, + opts.deviceApiKey as string, + apiTimeout, + ); + if (device != null) { + return device; + } + } + + // If it's not valid or doesn't exist then we try to use the + // user/provisioning api key for the exchange + const device = await this.fetchDevice( + uuid, + opts.provisioningApiKey as string, + apiTimeout, + ); + + if (device == null) { + throw new ExchangeKeyError(`Couldn't fetch device with provisioning key`); + } + + // We found the device so we can try to register a working device key for it + const [res] = await request + .postAsync(`${opts.apiEndpoint}/api-key/device/${device.id}/device-key`, { + json: true, + body: { + apiKey: opts.deviceApiKey, + }, + headers: { + Authorization: `Bearer ${opts.provisioningApiKey}`, + }, + }) + .timeout(apiTimeout); + + if (res.statusCode !== 200) { + throw new ExchangeKeyError( + `Couldn't register device key with provisioning key`, + ); + } + + return device; + } + + private async exchangeKeyAndGetDeviceOrRegenerate( + opts?: KeyExchangeOpts, + ): Promise { + try { + const device = await this.exchangeKeyAndGetDevice(opts); + console.log('Key exchange succeeded'); + return device; + } catch (e) { + if (e instanceof ExchangeKeyError) { + console.log('Exchanging key failed, re-registering...'); + await this.config.regenerateRegistrationFields(); + } + throw e; + } + } + + private async provision() { + let device: Device | null = null; + // FIXME: Config typing + const opts = ((await this.config.get( + 'provisioningOptions', + )) as any) as Dictionary; + + if ( + opts.registered_at != null && + opts.deviceId != null && + opts.provisioningApiKey == null + ) { + return; + } + + if (opts.registered_at != null && opts.deviceId == null) { + console.log( + 'Device is registered but no device id available, attempting key exchange', + ); + device = + (await this.exchangeKeyAndGetDeviceOrRegenerate( + opts as KeyExchangeOpts, + )) || null; + } else if (opts.registered_at == null) { + console.log('New device detected. Provisioning...'); + try { + device = await deviceRegister.register(opts).timeout(opts.apiTimeout); + opts.registered_at = Date.now(); + } catch (err) { + if (DuplicateUuidError(err)) { + console.log('UUID already registered, trying a key exchange'); + await this.exchangeKeyAndGetDeviceOrRegenerate( + opts as KeyExchangeOpts, + ); + } else { + throw err; + } + } + } else if (opts.provisioningApiKey != null) { + console.log( + 'Device is registered but we still have an apiKey, attempting key exchange', + ); + device = await this.exchangeKeyAndGetDevice(opts as KeyExchangeOpts); + } + + if (!device) { + // TODO: Type this? + throw new Error(`Failed to provision device!`); + } + const { id } = device; + if (!this.balenaApi) { + throw new InternalInconsistencyError( + 'Attempting to provision a device without an initialized API client', + ); + } + this.balenaApi.passthrough.headers.Authorization = `Bearer ${ + opts.deviceApiKey + }`; + + const configToUpdate = { + registered_at: opts.registered_at, + deviceId: id, + apiKey: null, + }; + await this.config.set(configToUpdate); + this.eventTracker.track('Device bootstrap success'); + + // Now check if we need to pin the device + const toPin = await this.config.get('pinDevice'); + let pinValue: DevicePinInfo | null = null; + try { + pinValue = JSON.parse(toPin as string); + } catch (e) { + console.log('Warning: Malformed pinDevice value in supervisor database'); + } + + if (pinValue != null) { + if (pinValue.app == null || pinValue.commit == null) { + console.log( + `Malformed pinDEvice fields in supervisor database: ${pinValue}`, + ); + return; + } + console.log('Attempting to pin device to preloaded release...'); + return this.pinDevice(pinValue); + } + } + + private async provisionOrRetry(retryDelay: number): Promise { + this.eventTracker.track('Device bootstrap'); + try { + await this.provision(); + } catch (e) { + this.eventTracker.track(`Device bootstrap failed, retrying`, { + error: e, + delay: retryDelay, + }); + await Bluebird.delay(retryDelay); + this.provisionOrRetry(retryDelay); + } + } + + private async provisionDevice() { + if (this.balenaApi == null) { + throw new Error( + 'Trying to provision a device without initializing API client', + ); + } + const conf = await this.config.getMany([ + 'provisioned', + 'bootstrapRetryDelay', + 'apiKey', + 'pinDevice', + ]); + + if (!conf.provisioned || conf.apiKey != null || conf.pinDevice != null) { + return this.provisionOrRetry(conf.bootstrapRetryDelay as number); + } + + return conf; + } + + private lockGetTarget() { + return writeLock('getTarget').disposer(release => { + release(); + }); + } + + private createAPIBinderRouter(apiBinder: APIBinder): express.Router { + const router = express.Router(); + + router.use(bodyParser.urlencoded({ extended: true })); + router.use(bodyParser.json()); + + router.post('/v1/update', (req, res) => { + apiBinder.eventTracker.track('Update notification'); + if (apiBinder.readyForUpdates) { + apiBinder.getAndSetTargetState(req.body.force, true).catch(_.noop); + } + res.sendStatus(204); + }); + + return router; + } +} diff --git a/src/config/configJson.ts b/src/config/configJson.ts index 31bec6ef..953a2a18 100644 --- a/src/config/configJson.ts +++ b/src/config/configJson.ts @@ -2,18 +2,15 @@ import * as Promise from 'bluebird'; import * as _ from 'lodash'; import { fs } from 'mz'; import * as path from 'path'; -import * as Lock from 'rwlock'; import { ConfigSchema, ConfigValue } from '../lib/types'; +import { readLock, writeLock } from '../lib/update-lock'; import * as constants from '../lib/constants'; import { writeAndSyncFile, writeFileAtomic } from '../lib/fs-utils'; import * as osRelease from '../lib/os-release'; -type LockCallback = (file: string) => Promise<() => void>; - export default class ConfigJsonConfigBackend { - private lock: Lock; private readLockConfigJson: () => Promise.Disposer<() => void>; private writeLockConfigJson: () => Promise.Disposer<() => void>; @@ -25,12 +22,7 @@ export default class ConfigJsonConfigBackend { public constructor(schema: ConfigSchema, configPath?: string) { this.configPath = configPath; this.schema = schema; - this.lock = new Lock(); - const writeLock: LockCallback = Promise.promisify( - this.lock.async.writeLock, - ); - const readLock: LockCallback = Promise.promisify(this.lock.async.readLock); this.writeLockConfigJson = () => writeLock('config.json').disposer(release => release()); this.readLockConfigJson = () => diff --git a/src/config/functions.ts b/src/config/functions.ts index 2d05b94c..d1d3d644 100644 --- a/src/config/functions.ts +++ b/src/config/functions.ts @@ -3,9 +3,9 @@ import { Transaction } from 'knex'; import * as _ from 'lodash'; import { URL } from 'url'; -import Config = require('../config'); import supervisorVersion = require('../lib/supervisor-version'); +import Config from '.'; import * as constants from '../lib/constants'; import * as osRelease from '../lib/os-release'; import { ConfigValue } from '../lib/types'; diff --git a/src/config/index.ts b/src/config/index.ts index 15f8ea6c..c8fedc1e 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -17,7 +17,7 @@ interface ConfigOpts { configPath: string; } -class Config extends EventEmitter { +export class Config extends EventEmitter { private db: DB; private configJsonBackend: ConfigJsonConfigBackend; private providerFunctions: ConfigProviderFunctions; @@ -271,7 +271,7 @@ class Config extends EventEmitter { }); } - private newUniqueKey(): string { + public newUniqueKey(): string { return generateUniqueKey(); } @@ -299,4 +299,4 @@ class Config extends EventEmitter { } } -export = Config; +export default Config; diff --git a/src/device-config.ts b/src/device-config.ts index 92fdb53a..e4b11ba2 100644 --- a/src/device-config.ts +++ b/src/device-config.ts @@ -1,6 +1,6 @@ import * as _ from 'lodash'; -import Config = require('./config'); +import Config from './config'; import Database, { Transaction } from './db'; import Logger from './logger'; diff --git a/src/device-state.coffee b/src/device-state.coffee index 9172530a..b0d9ed1c 100644 --- a/src/device-state.coffee +++ b/src/device-state.coffee @@ -1,6 +1,5 @@ Promise = require 'bluebird' _ = require 'lodash' -Lock = require 'rwlock' EventEmitter = require 'events' fs = Promise.promisifyAll(require('fs')) express = require 'express' @@ -122,9 +121,8 @@ module.exports = class DeviceState extends EventEmitter @on 'error', (err) -> console.error('Error in deviceState: ', err, err.stack) @_currentVolatile = {} - _lock = new Lock() - @_writeLock = Promise.promisify(_lock.async.writeLock) - @_readLock = Promise.promisify(_lock.async.readLock) + @_writeLock = updateLock.writeLock + @_readLock = updateLock.readLock @lastSuccessfulUpdate = null @failedUpdates = 0 @applyInProgress = false diff --git a/src/host-config.ts b/src/host-config.ts index 9bed8294..3e8ad972 100644 --- a/src/host-config.ts +++ b/src/host-config.ts @@ -5,7 +5,7 @@ import * as mkdirCb from 'mkdirp'; import { fs } from 'mz'; import * as path from 'path'; -import Config = require('./config'); +import Config from './config'; import * as constants from './lib/constants'; import { ENOENT } from './lib/errors'; import { writeFileAtomic } from './lib/fs-utils'; diff --git a/src/lib/errors.ts b/src/lib/errors.ts index b17a0a9f..20a566a1 100644 --- a/src/lib/errors.ts +++ b/src/lib/errors.ts @@ -1,4 +1,4 @@ -import { endsWith } from 'lodash'; +import { endsWith, startsWith } from 'lodash'; import TypedError = require('typed-error'); import { checkInt } from './validation'; @@ -44,3 +44,11 @@ export class InvalidAppIdError extends TypedError { } export class UpdatesLockedError extends TypedError {} + +export function DuplicateUuidError(err: Error) { + return startsWith(err.message, '"uuid" must be unique'); +} + +export class ExchangeKeyError extends TypedError {} + +export class InternalInconsistencyError extends TypedError {} diff --git a/src/lib/request.ts b/src/lib/request.ts index 167ed802..a1073cd4 100644 --- a/src/lib/request.ts +++ b/src/lib/request.ts @@ -26,7 +26,7 @@ const DEFAULT_REQUEST_TIMEOUT = 30000; // ms const DEFAULT_REQUEST_RETRY_INTERVAL = 10000; // ms const DEFAULT_REQUEST_RETRY_COUNT = 30; -export const requestOpts = { +export const requestOpts: requestLib.CoreOptions = { gzip: true, timeout: DEFAULT_REQUEST_TIMEOUT, headers: { @@ -40,9 +40,20 @@ const resumableOpts = { retryInterval: DEFAULT_REQUEST_RETRY_INTERVAL, }; +type PromisifiedRequest = typeof requestLib & { + postAsync: ( + uri: string | requestLib.CoreOptions, + options?: requestLib.CoreOptions | undefined, + ) => Bluebird; + getAsync: ( + uri: string | requestLib.CoreOptions, + options?: requestLib.CoreOptions | undefined, + ) => Bluebird; +}; + const requestHandle = requestLib.defaults(exports.requestOpts); export const request = Bluebird.promisifyAll(requestHandle, { multiArgs: true, -}); +}) as PromisifiedRequest; export const resumable = resumableRequestLib.defaults(resumableOpts); diff --git a/src/lib/update-lock.ts b/src/lib/update-lock.ts index 4e794c0e..0386f58a 100644 --- a/src/lib/update-lock.ts +++ b/src/lib/update-lock.ts @@ -46,8 +46,14 @@ process.on('exit', () => { } }); +type LockFn = (key: string | number) => Bluebird<() => void>; const locker = new Lock(); -const writeLock = Bluebird.promisify(locker.async.writeLock).bind(locker); +export const writeLock: LockFn = Bluebird.promisify(locker.async.writeLock, { + context: locker, +}); +export const readLock: LockFn = Bluebird.promisify(locker.async.readLock, { + context: locker, +}); function dispose(release: () => void): Bluebird { return Bluebird.map(_.keys(locksTaken), lockName => { @@ -101,5 +107,10 @@ export function lock( .disposer(dispose); }; - return Bluebird.using(takeTheLock(), fn); + const disposer = takeTheLock(); + if (disposer) { + return Bluebird.using(disposer, fn); + } else { + return Bluebird.resolve(fn()); + } } diff --git a/src/local-mode.ts b/src/local-mode.ts index d12049c4..baac6aab 100644 --- a/src/local-mode.ts +++ b/src/local-mode.ts @@ -2,7 +2,7 @@ import * as Bluebird from 'bluebird'; import * as Docker from 'dockerode'; import * as _ from 'lodash'; -import Config = require('./config'); +import Config from './config'; import Database from './db'; import { checkTruthy } from './lib/validation'; import { Logger } from './logger'; diff --git a/src/logger.ts b/src/logger.ts index c89ef35d..f3f195c9 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,11 +1,11 @@ import * as Bluebird from 'bluebird'; import * as es from 'event-stream'; import * as _ from 'lodash'; -import * as Lock from 'rwlock'; import { EventTracker } from './event-tracker'; import Docker = require('./lib/docker-utils'); import { LogType } from './lib/log-types'; +import { writeLock } from './lib/update-lock'; import { LocalLogBackend, LogBackend, @@ -34,10 +34,6 @@ interface LoggerConstructOptions { } export class Logger { - private writeLock: (key: string) => Bluebird<() => void> = Bluebird.promisify( - new Lock().async.writeLock, - ); - private backend: LogBackend | null = null; private balenaBackend: BalenaLogBackend | null = null; private localBackend: LocalLogBackend | null = null; @@ -130,7 +126,7 @@ export class Logger { } public lock(containerId: string): Bluebird.Disposer<() => void> { - return this.writeLock(containerId).disposer(release => { + return writeLock(containerId).disposer(release => { release(); }); } diff --git a/src/supervisor-api.coffee b/src/supervisor-api.coffee deleted file mode 100644 index 0b831bf6..00000000 --- a/src/supervisor-api.coffee +++ /dev/null @@ -1,117 +0,0 @@ -Promise = require 'bluebird' -express = require 'express' -morgan = require 'morgan' -bufferEq = require 'buffer-equal-constant-time' -blink = require './lib/blink' -iptables = require './lib/iptables' -{ checkTruthy } = require './lib/validation' - -authenticate = (config) -> - return (req, res, next) -> - queryKey = req.query.apikey - header = req.get('Authorization') ? '' - match = header.match(/^ApiKey (\w+)$/) - headerKey = match?[1] - config.getMany([ 'apiSecret', 'localMode', 'unmanaged', 'osVariant' ]) - .then (conf) -> - needsAuth = if conf.unmanaged - conf.osVariant is 'prod' - else - not conf.localMode - - if needsAuth - key = queryKey ? headerKey - if bufferEq(Buffer.from(key), Buffer.from(conf.apiSecret)) - next() - else - res.sendStatus(401) - else - next() - .catch (err) -> - res.status(503).send("Unexpected error: #{err}") - -expressLogger = morgan (tokens, req, res) -> [ - 'Supervisor API:' - tokens.method(req, res) - req.path - tokens.status(req, res) - '-' - tokens['response-time'](req, res) - 'ms' - ].join(' ') - -module.exports = class SupervisorAPI - constructor: ({ @config, @eventTracker, @routers, @healthchecks }) -> - @server = null - @_api = express() - @_api.disable('x-powered-by') - @_api.use(expressLogger) - - @_api.get '/v1/healthy', (req, res) => - Promise.map @healthchecks, (fn) -> - fn() - .then (healthy) -> - if !healthy - throw new Error('Unhealthy') - .then -> - res.sendStatus(200) - .catch -> - res.sendStatus(500) - - @_api.use(authenticate(@config)) - - @_api.get '/ping', (req, res) -> - res.send('OK') - - @_api.post '/v1/blink', (req, res) => - @eventTracker.track('Device blink') - blink.pattern.start() - setTimeout(blink.pattern.stop, 15000) - res.sendStatus(200) - - # Expires the supervisor's API key and generates a new one. - # It also communicates the new key to the balena API. - @_api.post '/v1/regenerate-api-key', (req, res) => - @config.newUniqueKey() - .then (secret) => - @config.set(apiSecret: secret) - .then -> - res.status(200).send(secret) - .catch (err) -> - res.status(503).send(err?.message or err or 'Unknown error') - - - for router in @routers - @_api.use(router) - - listen: (allowedInterfaces, port, apiTimeout) => - @config.get('localMode').then (localMode) => - @applyListeningRules(checkTruthy(localMode), port, allowedInterfaces) - .then => - # Monitor the switching of local mode, and change which interfaces will - # be listented to based on that - @config.on 'change', (changedConfig) => - if changedConfig.localMode? - @applyListeningRules(changedConfig.localMode, port, allowedInterfaces) - .then => - @server = @_api.listen(port) - @server.timeout = apiTimeout - - applyListeningRules: (allInterfaces, port, allowedInterfaces) => - Promise.try -> - if checkTruthy(allInterfaces) - iptables.removeRejections(port).then -> - console.log('Supervisor API listening on all interfaces') - else - iptables.rejectOnAllInterfacesExcept(allowedInterfaces, port).then -> - console.log('Supervisor API listening on allowed interfaces only') - .catch (e) => - # If there's an error, stop the supervisor api from answering any endpoints, - # and this will eventually be restarted by the healthcheck - console.log('Error on switching supervisor API listening rules - stopping API.') - console.log(' ', e) - if @server? - @stop() - - stop: -> - @server.close() diff --git a/src/supervisor-api.ts b/src/supervisor-api.ts new file mode 100644 index 00000000..1c640af6 --- /dev/null +++ b/src/supervisor-api.ts @@ -0,0 +1,196 @@ +import * as express from 'express'; +import { Server } from 'http'; +import * as _ from 'lodash'; +import * as morgan from 'morgan'; + +import Config from './config'; +import { EventTracker } from './event-tracker'; +import blink = require('./lib/blink'); +import * as iptables from './lib/iptables'; +import { checkTruthy } from './lib/validation'; + +function getKeyFromReq(req: express.Request): string | null { + const queryKey = req.query.apikey; + if (queryKey != null) { + return queryKey; + } + const maybeHeaderKey = req.get('Authorization'); + if (!maybeHeaderKey) { + return null; + } + + const match = maybeHeaderKey.match(/^ApiKey (\w+)$/); + return match != null ? match[1] : null; +} + +function authenticate(config: Config): express.RequestHandler { + return async (req, res, next) => { + try { + const conf = await config.getMany([ + 'apiSecret', + 'localMode', + 'unmanaged', + 'osVariant', + ]); + + const needsAuth = conf.unmanaged + ? conf.osVariant === 'prod' + : !conf.localMode; + + if (needsAuth) { + // Only get the key if we need it + const key = getKeyFromReq(req); + if (key && conf.apiSecret && key === conf.apiSecret) { + return next(); + } else { + return res.sendStatus(401); + } + } else { + return next(); + } + } catch (err) { + res.status(503).send(`Unexpected error: ${err}`); + } + }; +} + +const expressLogger = morgan((tokens, req, res) => + [ + 'Supervisor API:', + tokens.method(req, res), + req.path, + tokens.status(req, res), + '-', + tokens['response-time'](req, res), + 'ms', + ].join(' '), +); + +interface SupervisorAPIConstructOpts { + config: Config; + eventTracker: EventTracker; + routers: express.Router[]; + healthchecks: Array<() => Promise>; +} + +export class SupervisorAPI { + private config: Config; + private eventTracker: EventTracker; + private routers: express.Router[]; + private healthchecks: Array<() => Promise>; + + private api = express(); + private server: Server | null = null; + + public constructor({ + config, + eventTracker, + routers, + healthchecks, + }: SupervisorAPIConstructOpts) { + this.config = config; + this.eventTracker = eventTracker; + this.routers = routers; + this.healthchecks = healthchecks; + + this.api.disable('x-powered-by'); + this.api.use(expressLogger); + this.api.use(authenticate(this.config)); + + this.api.get('/v1/healthy', async (_req, res) => { + try { + const healths = await Promise.all(this.healthchecks.map(fn => fn())); + if (!_.every(healths)) { + throw new Error('Unhealthy'); + } + return res.sendStatus(200); + } catch (e) { + res.sendStatus(500); + } + }); + + this.api.get('/ping', (_req, res) => res.send('OK')); + + this.api.post('/v1/blink', (_req, res) => { + this.eventTracker.track('Device blink'); + blink.pattern.start(); + setTimeout(blink.pattern.stop, 15000); + return res.sendStatus(200); + }); + + // Expires the supervisor's API key and generates a new one. + // It also communicates the new key to the balena API. + this.api.post('/v1/regenerate-api-key', async (_req, res) => { + try { + const secret = await this.config.newUniqueKey(); + await this.config.set({ apiSecret: secret }); + res.status(200).send(secret); + } catch (e) { + res.status(503).send(e != null ? e.message : e || 'Unknown error'); + } + }); + + // And assign all external routers + for (const router of this.routers) { + this.api.use(router); + } + } + + public async listen( + allowedInterfaces: string[], + port: number, + apiTimeout: number, + ): Promise { + const localMode = (await this.config.get('localMode')) || false; + await this.applyListeningRules( + checkTruthy(localMode) || false, + port, + allowedInterfaces, + ); + + // Monitor the switching of local mode, and change which interfaces will + // be listened to based on that + this.config.on('change', (changedConfig: Dictionary) => { + if (changedConfig.localMode != null) { + this.applyListeningRules( + checkTruthy(changedConfig.localMode || false) || false, + port, + allowedInterfaces, + ); + } + }); + + this.server = this.api.listen(port); + this.server.timeout = apiTimeout; + } + + private async applyListeningRules( + allInterfaces: boolean, + port: number, + allowedInterfaces: string[], + ): Promise { + try { + if (checkTruthy(allInterfaces)) { + await iptables.removeRejections(port); + console.log('Supervisor API listening on all interfaces'); + } else { + await iptables.rejectOnAllInterfacesExcept(allowedInterfaces, port); + console.log('Supervisor API listening on allowed interfaces only'); + } + } catch (err) { + console.log( + 'Error on switching supervisor API listening rules - stopping API.', + ); + console.log(' ', err); + this.stop(); + } + } + + public stop() { + if (this.server != null) { + this.server.close(); + } + } +} + +export default SupervisorAPI; diff --git a/src/supervisor.coffee b/src/supervisor.coffee index 61763a27..4c3f3941 100644 --- a/src/supervisor.coffee +++ b/src/supervisor.coffee @@ -2,10 +2,10 @@ EventEmitter = require 'events' { EventTracker } = require './event-tracker' { DB } = require './db' -Config = require './config' -APIBinder = require './api-binder' +{ Config } = require './config' +{ APIBinder } = require './api-binder' DeviceState = require './device-state' -SupervisorAPI = require './supervisor-api' +{ SupervisorAPI } = require './supervisor-api' { Logger } = require './logger' { checkTruthy } = require './lib/validation' diff --git a/src/types/state.ts b/src/types/state.ts index 7f68d15b..6dd90100 100644 --- a/src/types/state.ts +++ b/src/types/state.ts @@ -13,7 +13,7 @@ export interface DeviceApplicationState { }; }; }; - // TODO - dependent: any; - commit: string; + // TODO: Type the dependent entry correctly + dependent?: any; + commit?: string; } diff --git a/test/03-config.spec.coffee b/test/03-config.spec.coffee index f4590053..21d51c99 100644 --- a/test/03-config.spec.coffee +++ b/test/03-config.spec.coffee @@ -6,7 +6,7 @@ fs = Promise.promisifyAll(require('fs')) m.chai.use(require('chai-events')) { DB } = require('../src/db') -Config = require('../src/config') +{ Config } = require('../src/config') constants = require('../src/lib/constants') describe 'Config', -> diff --git a/test/05-device-state.spec.coffee b/test/05-device-state.spec.coffee index 5d492983..745e7cab 100644 --- a/test/05-device-state.spec.coffee +++ b/test/05-device-state.spec.coffee @@ -8,7 +8,7 @@ m.chai.use(require('chai-events')) prepare = require './lib/prepare' DeviceState = require '../src/device-state' { DB } = require('../src/db') -Config = require('../src/config') +{ Config } = require('../src/config') { RPiConfigBackend } = require('../src/config/backend') { Service } = require '../src/compose/service' diff --git a/test/11-api-binder.spec.coffee b/test/11-api-binder.spec.coffee index 312dd82e..32e3dbe6 100644 --- a/test/11-api-binder.spec.coffee +++ b/test/11-api-binder.spec.coffee @@ -8,9 +8,9 @@ m = require 'mochainon' { stub, spy } = m.sinon { DB } = require('../src/db') -Config = require('../src/config') +{ Config } = require('../src/config') DeviceState = require('../src/device-state') -APIBinder = require('../src/api-binder') +{ APIBinder } = require('../src/api-binder') initModels = (filename) -> prepare() @@ -94,7 +94,7 @@ describe 'APIBinder', -> .then (theDevice) -> expect(theDevice).to.deep.equal(balenaAPI.balenaBackend.devices[3]) - describe '_exchangeKeyAndGetDevice', -> + describe 'exchangeKeyAndGetDevice', -> before -> initModels.call(this, '/config-apibinder.json') @@ -102,7 +102,7 @@ describe 'APIBinder', -> spy(balenaAPI.balenaBackend, 'deviceKeyHandler') fetchDeviceStub = stub(@apiBinder, 'fetchDevice') fetchDeviceStub.onCall(0).resolves({ id: 1 }) - @apiBinder._exchangeKeyAndGetDevice(mockProvisioningOpts) + @apiBinder.exchangeKeyAndGetDevice(mockProvisioningOpts) .then (device) => expect(balenaAPI.balenaBackend.deviceKeyHandler).to.not.be.called expect(device).to.deep.equal({ id: 1 }) @@ -113,7 +113,7 @@ describe 'APIBinder', -> it 'throws if it cannot get the device with any of the keys', -> spy(balenaAPI.balenaBackend, 'deviceKeyHandler') stub(@apiBinder, 'fetchDevice').returns(Promise.resolve(null)) - promise = @apiBinder._exchangeKeyAndGetDevice(mockProvisioningOpts) + promise = @apiBinder.exchangeKeyAndGetDevice(mockProvisioningOpts) promise.catch(->) expect(promise).to.be.rejected .then => @@ -127,7 +127,7 @@ describe 'APIBinder', -> fetchDeviceStub = stub(@apiBinder, 'fetchDevice') fetchDeviceStub.onCall(0).returns(Promise.resolve(null)) fetchDeviceStub.onCall(1).returns(Promise.resolve({ id: 1 })) - @apiBinder._exchangeKeyAndGetDevice(mockProvisioningOpts) + @apiBinder.exchangeKeyAndGetDevice(mockProvisioningOpts) .then (device) => expect(balenaAPI.balenaBackend.deviceKeyHandler).to.be.calledOnce expect(device).to.deep.equal({ id: 1 }) diff --git a/test/14-application-manager.spec.coffee b/test/14-application-manager.spec.coffee index 1985f29d..428d8c56 100644 --- a/test/14-application-manager.spec.coffee +++ b/test/14-application-manager.spec.coffee @@ -9,7 +9,7 @@ m.chai.use(require('chai-events')) prepare = require './lib/prepare' DeviceState = require '../src/device-state' { DB } = require('../src/db') -Config = require('../src/config') +{ Config } = require('../src/config') { Service } = require '../src/compose/service' appDBFormatNormalised = { diff --git a/typings/blinking.d.ts b/typings/blinking.d.ts index de11c5df..77da7fd0 100644 --- a/typings/blinking.d.ts +++ b/typings/blinking.d.ts @@ -7,7 +7,7 @@ declare module 'blinking' { } interface Blink { - start: (pattern: Pattern) => void; + start: (pattern?: Pattern) => void; stop: () => void; } diff --git a/typings/resin-register-device.d.ts b/typings/resin-register-device.d.ts index 681593fa..3fcfa28a 100644 --- a/typings/resin-register-device.d.ts +++ b/typings/resin-register-device.d.ts @@ -3,4 +3,5 @@ // and upstream types to the repo declare module 'resin-register-device' { export function generateUniqueKey(): string; + export function register(opts: Dictionary): Bluebird<{ id: string }>; }