From 54731c2d204803fbf87c1a5c57989762071ae359 Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Sat, 7 Nov 2020 22:28:14 +0000 Subject: [PATCH] Livepush, logs: Automatically reconnect on 'Connection to device lost' Change-type: minor --- doc/cli.markdown | 5 ++ lib/commands/logs.ts | 22 ++++++--- lib/utils/compose.js | 14 +++--- lib/utils/device/deploy.ts | 68 +++++++++++++------------- lib/utils/device/live.ts | 1 + lib/utils/device/logs.ts | 96 ++++++++++++++++++++++++++++++++----- lib/utils/helpers.ts | 61 +++++++++++++++-------- tests/commands/logs.spec.ts | 21 ++++++-- 8 files changed, 205 insertions(+), 83 deletions(-) diff --git a/doc/cli.markdown b/doc/cli.markdown index df6b7fbf..cb7d91db 100644 --- a/doc/cli.markdown +++ b/doc/cli.markdown @@ -1506,6 +1506,11 @@ device UUID, IP, or .local address ### Options +#### --max-retry MAX-RETRY + +Maximum number of reconnection attempts on "connection lost" errors +(use 0 to disable auto reconnection). + #### -t, --tail continuously stream output diff --git a/lib/commands/logs.ts b/lib/commands/logs.ts index 205b650f..1a083ae5 100644 --- a/lib/commands/logs.ts +++ b/lib/commands/logs.ts @@ -23,6 +23,7 @@ import { LogMessage } from 'balena-sdk'; import { IArg } from '@oclif/parser/lib/args'; interface FlagsDef { + 'max-retry'?: number; tail?: boolean; service?: string[]; system?: boolean; @@ -33,6 +34,8 @@ interface ArgsDef { device: string; } +const MAX_RETRY = 1000; + export default class LogsCmd extends Command { public static description = stripIndent` Show device logs. @@ -75,6 +78,11 @@ export default class LogsCmd extends Command { public static usage = 'logs '; public static flags: flags.Input = { + 'max-retry': flags.integer({ + description: stripIndent` + Maximum number of reconnection attempts on "connection lost" errors + (use 0 to disable auto reconnection).`, + }), tail: flags.boolean({ default: false, description: 'continuously stream output', @@ -105,7 +113,7 @@ export default class LogsCmd extends Command { const balena = getBalenaSdk(); const { serviceIdToName } = await import('../utils/cloud'); - const { displayDeviceLogs, displayLogObject } = await import( + const { connectAndDisplayDeviceLogs, displayLogObject } = await import( '../utils/device/logs' ); const { validateIPAddress, validateDotLocalUrl } = await import( @@ -153,13 +161,13 @@ export default class LogsCmd extends Command { } logger.logDebug('Streaming logs'); - const logStream = await deviceApi.getLogStream(); - await displayDeviceLogs( - logStream, + await connectAndDisplayDeviceLogs({ + deviceApi, logger, - options.system || false, - options.service, - ); + system: options.system || false, + filterServices: options.service, + maxAttempts: 1 + (options['max-retry'] ?? MAX_RETRY), + }); } else { // Logs from cloud await Command.checkLoggedIn(); diff --git a/lib/utils/compose.js b/lib/utils/compose.js index a11b867e..3784db5e 100644 --- a/lib/utils/compose.js +++ b/lib/utils/compose.js @@ -366,15 +366,15 @@ export const pushAndUpdateServiceImages = function ( images.map(({ serviceImage, localImage, props, logs }, index) => Promise.all([ localImage.inspect().then((img) => img.Size), - retry( + retry({ // @ts-ignore - () => progress.push(localImage.name, reporters[index], opts), - 3, // `times` - retry 3 times + func: () => progress.push(localImage.name, reporters[index], opts), + maxAttempts: 3, // try calling func 3 times (max) // @ts-ignore - localImage.name, // `label` included in retry log messages - 2000, // `delayMs` - wait 2 seconds before the 1st retry - 1.4, // `backoffScaler` - wait multiplier for each retry - ).finally(renderer.end), + label: localImage.name, // label for retry log messages + initialDelayMs: 2000, // wait 2 seconds before the 1st retry + backoffScaler: 1.4, // wait multiplier for each retry + }).finally(renderer.end), ]) .then( /** @type {([number, string]) => void} */ diff --git a/lib/utils/device/deploy.ts b/lib/utils/device/deploy.ts index 3a556374..f057f9f5 100644 --- a/lib/utils/device/deploy.ts +++ b/lib/utils/device/deploy.ts @@ -123,7 +123,6 @@ async function environmentFromInput( export async function deployToDevice(opts: DeviceDeployOptions): Promise { const { exitWithExpectedError } = await import('../../errors'); - const { displayDeviceLogs } = await import('./logs'); // Resolve .local addresses to IP to avoid // issue with Windows and rapid repeat lookups. @@ -220,6 +219,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { buildLogs, ); + globalLogger.outputDeferredMessages(); + // Print a newline to clearly separate build time and runtime + console.log(); + const envs = await environmentFromInput( opts.env, Object.getOwnPropertyNames(project.composition.services), @@ -244,10 +247,11 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { // Now that we've set the target state, the device will do it's thing // so we can either just display the logs, or start a livepush session // (whilst also display logs) + const promises: Array> = [streamDeviceLogs(api, opts)]; + let livepush: LivepushManager | null = null; + if (!opts.nolive) { - // Print a newline to clear seperate build time and runtime - console.log(); - const livepush = new LivepushManager({ + livepush = new LivepushManager({ api, buildContext: opts.source, buildTasks, @@ -257,43 +261,37 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { buildLogs: buildLogs!, deployOpts: opts, }); - - const promises: Array> = [livepush.init()]; - // Only show logs if we're not detaching - if (!opts.detached) { - const logStream = await api.getLogStream(); - globalLogger.logInfo('Streaming device logs...'); - promises.push( - displayDeviceLogs(logStream, globalLogger, opts.system, opts.services), - ); - } else { + promises.push(livepush.init()); + if (opts.detached) { globalLogger.logLivepush( 'Running in detached mode, no service logs will be shown', ); } globalLogger.logLivepush('Watching for file changes...'); - globalLogger.outputDeferredMessages(); - await Promise.all(promises); - - livepush.close(); - } else { - if (opts.detached) { - return; - } - // Print an empty newline to separate the build output - // from the device output - console.log(); - // Now all we need to do is stream back the logs - const logStream = await api.getLogStream(); - globalLogger.logInfo('Streaming device logs...'); - globalLogger.outputDeferredMessages(); - await displayDeviceLogs( - logStream, - globalLogger, - opts.system, - opts.services, - ); } + await Promise.all(promises).finally(() => { + // Stop watching files after log streaming ends (e.g. on SIGINT) + livepush?.close(); + }); +} + +async function streamDeviceLogs( + deviceApi: DeviceAPI, + opts: DeviceDeployOptions, +) { + // Only show logs if we're not detaching + if (opts.detached) { + return; + } + globalLogger.logInfo('Streaming device logs...'); + const { connectAndDisplayDeviceLogs } = await import('./logs'); + return connectAndDisplayDeviceLogs({ + deviceApi, + logger: globalLogger, + system: opts.system || false, + filterServices: opts.services, + maxAttempts: 1001, + }); } function connectToDocker(host: string, port: number): Docker { diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index a310320d..7ede20c4 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -283,6 +283,7 @@ export class LivepushManager { return monitor; } + /** Stop the filesystem watcher, allowing the Node process to exit gracefully */ public close() { for (const container of Object.values(this.containers)) { container.monitor.close().catch((err) => { diff --git a/lib/utils/device/logs.ts b/lib/utils/device/logs.ts index 828551ed..ef7b00f4 100644 --- a/lib/utils/device/logs.ts +++ b/lib/utils/device/logs.ts @@ -1,9 +1,33 @@ +/** + * @license + * Copyright 2018-2020 Balena Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import ColorHash = require('color-hash'); import * as _ from 'lodash'; import type { Readable } from 'stream'; import { getChalk } from '../lazy'; import Logger = require('../logger'); +import { ExpectedError } from '../../errors'; + +class DeviceConnectionLostError extends ExpectedError { + public static defaultMsg = 'Connection to device lost'; + constructor(msg?: string) { + super(msg || DeviceConnectionLostError.defaultMsg); + } +} interface Log { message: string; @@ -32,24 +56,74 @@ interface BuildLog { * @param filterService Filter the logs so that only logs * from a single service will be displayed */ -export function displayDeviceLogs( +async function displayDeviceLogs( logs: Readable, logger: Logger, system: boolean, filterServices?: string[], ): Promise { - return new Promise((resolve, reject) => { - logs.on('data', (log) => { - displayLogLine(log, logger, system, filterServices); + let gotSignal = false; + const handleSignal = () => { + gotSignal = true; + logs.emit('close'); + }; + process.once('SIGINT', handleSignal); + process.once('SIGTERM', handleSignal); + try { + await new Promise((resolve, reject) => { + logs.on('data', (log) => { + displayLogLine(log, logger, system, filterServices); + }); + logs.once('error', reject); + logs.once('end', () => { + logger.logWarn(DeviceConnectionLostError.defaultMsg); + if (gotSignal) { + resolve(); + } else { + reject(new DeviceConnectionLostError()); + } + }); }); - logs.once('error', reject); - logs.once('end', () => { - logger.logWarn('Connection to device lost'); - resolve(); + } finally { + process.removeListener('SIGINT', handleSignal); + process.removeListener('SIGTERM', handleSignal); + } +} + +export async function connectAndDisplayDeviceLogs({ + deviceApi, + logger, + system, + filterServices, + maxAttempts = 3, +}: { + deviceApi: import('./api').DeviceAPI; + logger: Logger; + system: boolean; + filterServices?: string[]; + maxAttempts?: number; +}) { + async function connectAndDisplay() { + // Open a new connection to the device's supervisor, TCP port 48484 + const logStream = await deviceApi.getLogStream(); + return displayDeviceLogs(logStream, logger, system, filterServices); + } + + const { retry } = await import('../../utils/helpers'); + try { + await retry({ + func: connectAndDisplay, + maxAttempts, + label: 'Streaming logs', }); - process.once('SIGINT', () => logs.emit('close')); - process.once('SIGTERM', () => logs.emit('close')); - }); + } catch (err) { + if (err instanceof DeviceConnectionLostError) { + err.message = `Max retry count (${ + maxAttempts - 1 + }) exceeded while attempting to reconnect to the device`; + } + throw err; + } } export function displayBuildLog(log: BuildLog, logger: Logger): void { diff --git a/lib/utils/helpers.ts b/lib/utils/helpers.ts index 5f652f52..5bdaa873 100644 --- a/lib/utils/helpers.ts +++ b/lib/utils/helpers.ts @@ -204,39 +204,62 @@ function getApplication( ) as Promise; } +const second = 1000; // 1000 milliseconds +const minute = 60 * second; export const delay = promisify(setTimeout); /** * Call `func`, and if func() throws an error or returns a promise that - * eventually rejects, retry it `times` many times, each time printing a - * log message including the given `label` and the error that led to - * retrying. Wait delayMs before the first retry, multiplying the wait - * by backoffScaler for each further attempt. + * eventually rejects, retry it `times` many times, each time printing a log + * message including the given `label` and the error that led to retrying. + * Wait initialDelayMs before the first retry. Before each further retry, + * the delay is reduced by the time elapsed since the last retry, and + * increased by multiplying the result by backoffScaler. * @param func: The function to call and, if needed, retry calling - * @param times: How many times to retry calling func() + * @param maxAttempts: How many times (max) to try calling func(). + * func() will always be called at least once. * @param label: Label to include in the retry log message - * @param startingDelayMs: How long to wait before the first retry + * @param initialDelayMs: How long to wait before the first retry * @param backoffScaler: Multiplier to previous wait time - * @param count: Used "internally" for the recursive calls + * @param maxSingleDelayMs: Maximum interval between retries */ -export async function retry( - func: () => T, - times: number, - label: string, - startingDelayMs = 1000, +export async function retry({ + func, + maxAttempts, + label, + initialDelayMs = 1000, backoffScaler = 2, -): Promise { - for (let count = 0; count < times - 1; count++) { + maxSingleDelayMs = 1 * minute, +}: { + func: () => T; + maxAttempts: number; + label: string; + initialDelayMs?: number; + backoffScaler?: number; + maxSingleDelayMs?: number; +}): Promise { + let delayMs = initialDelayMs; + for (let count = 0; count < maxAttempts - 1; count++) { + const lastAttemptMs = Date.now(); try { return await func(); } catch (err) { - const delayMS = backoffScaler ** count * startingDelayMs; + if (count) { + // use Math.max to work around system time changes, e.g. DST + const elapsedMs = Math.max(0, Date.now() - lastAttemptMs); + // reduce delayMs by the time elapsed since the last attempt + delayMs = Math.max(initialDelayMs, delayMs - elapsedMs); + // increase delayMs by the backoffScaler factor + delayMs = Math.min(maxSingleDelayMs, delayMs * backoffScaler); + } + const sec = delayMs / 1000; + const secStr = sec < 10 ? sec.toFixed(1) : Math.round(sec).toString(); console.log( - `Retrying "${label}" after ${(delayMS / 1000).toFixed(2)}s (${ - count + 1 - } of ${times}) due to: ${err}`, + `Retrying "${label}" after ${secStr}s (${count + 1} of ${ + maxAttempts - 1 + }) due to: ${err}`, ); - await delay(delayMS); + await delay(delayMs); } } return await func(); diff --git a/tests/commands/logs.spec.ts b/tests/commands/logs.spec.ts index 49148920..08e0da43 100644 --- a/tests/commands/logs.spec.ts +++ b/tests/commands/logs.spec.ts @@ -46,20 +46,33 @@ describe('balena logs', function () { itS('should reach the expected endpoints on a local device', async () => { supervisor.expectGetPing(); supervisor.expectGetLogs(); + supervisor.expectGetLogs(); - const { err, out } = await runCommand('logs 1.2.3.4'); + const { err, out } = await runCommand('logs 1.2.3.4 --max-retry 1'); - expect(err).to.be.empty; + const errLines = cleanOutput(err, true); + const errMsg = + 'Max retry count (1) exceeded while attempting to reconnect to the device'; + if (process.env.DEBUG) { + expect(errLines).to.include(errMsg); + } else { + expect(errLines).to.have.members([errMsg]); + } const removeTimestamps = (logLine: string) => logLine.replace(/(?<=\[Logs\]) \[.+?\]/, ''); const cleanedOut = cleanOutput(out, true).map((l) => removeTimestamps(l)); - expect(cleanedOut).to.deep.equal([ + expect(cleanedOut).to.have.members([ '[Logs] Streaming logs', '[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', '[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', - '[Error] Connection to device lost', + '[Warn] Connection to device lost', + 'Retrying "Streaming logs" after 1.0s (1 of 1) due to: DeviceConnectionLostError: Connection to device lost', + '[Logs] Streaming logs', + '[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', + '[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', + '[Warn] Connection to device lost', ]); }); });