From 54978357280f70196fd6ef535c272980acfb6296 Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Sat, 7 Nov 2020 23:42:48 +0000 Subject: [PATCH 1/4] Livepush: Fix process not exiting on "Connection to device lost" Resolves: #1828 Change-type: patch --- lib/utils/device/deploy.ts | 2 ++ lib/utils/device/live.ts | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/lib/utils/device/deploy.ts b/lib/utils/device/deploy.ts index 96e802de..3a556374 100644 --- a/lib/utils/device/deploy.ts +++ b/lib/utils/device/deploy.ts @@ -274,6 +274,8 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { globalLogger.logLivepush('Watching for file changes...'); globalLogger.outputDeferredMessages(); await Promise.all(promises); + + livepush.close(); } else { if (opts.detached) { return; diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index 77865dc5..3cd4d7d4 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -290,6 +290,16 @@ export class LivepushManager { return monitor; } + public close() { + for (const container of Object.values(this.containers)) { + container.monitor.close().catch((err) => { + if (process.env.DEBUG) { + this.logger.logDebug(`chokidar.close() ${err.message}`); + } + }); + } + } + public static preprocessDockerfile(content: string): string { return new Dockerfile(content).generateLiveDockerfile(); } From d00db5ea8c6bb33bcc9abe6c2dfd71c93b7e910d Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Sat, 7 Nov 2020 23:45:03 +0000 Subject: [PATCH 2/4] logs: Fix CTRL-C ignored on Windows (PowerShell, MSYS, Git for Windows) Change-type: patch --- lib/utils/device/live.ts | 11 +----- lib/utils/device/logs.ts | 9 +++-- lib/utils/remote-build.ts | 63 ++++++++++++++++++------------- patches/all/exit-hook+1.1.1.patch | 17 +++++++++ 4 files changed, 61 insertions(+), 39 deletions(-) create mode 100644 patches/all/exit-hook+1.1.1.patch diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index 3cd4d7d4..a310320d 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -221,21 +221,14 @@ export class LivepushManager { } // Setup cleanup handlers for the device - - // This is necessary because the `exit-hook` module is used by several - // dependencies, and will exit without calling the following handler. - // Once https://github.com/balena-io/balena-cli/issues/867 has been solved, - // we are free to (and definitely should) remove the below line - process.removeAllListeners('SIGINT'); - process.on('SIGINT', async () => { + process.once('SIGINT', async () => { this.logger.logLivepush('Cleaning up device...'); await Promise.all( _.map(this.containers, (container) => { container.livepush.cleanupIntermediateContainers(); }), ); - - process.exit(0); + this.logger.logDebug('Cleaning up done.'); }); } diff --git a/lib/utils/device/logs.ts b/lib/utils/device/logs.ts index be8fab4d..828551ed 100644 --- a/lib/utils/device/logs.ts +++ b/lib/utils/device/logs.ts @@ -42,12 +42,13 @@ export function displayDeviceLogs( logs.on('data', (log) => { displayLogLine(log, logger, system, filterServices); }); - - logs.on('error', reject); - logs.on('end', () => { - logger.logError('Connection to device lost'); + logs.once('error', reject); + logs.once('end', () => { + logger.logWarn('Connection to device lost'); resolve(); }); + process.once('SIGINT', () => logs.emit('close')); + process.once('SIGTERM', () => logs.emit('close')); }); } diff --git a/lib/utils/remote-build.ts b/lib/utils/remote-build.ts index 0e080abf..39951f14 100644 --- a/lib/utils/remote-build.ts +++ b/lib/utils/remote-build.ts @@ -123,32 +123,7 @@ export async function startRemoteBuild(build: RemoteBuild): Promise { } if (!build.opts.headless) { - return new Promise((resolve, reject) => { - // Setup interrupt handlers so we can cancel the build if the user presses - // ctrl+c - - // This is necessary because the `exit-hook` module is used by several - // dependencies, and will exit without calling the following handler. - // Once https://github.com/balena-io/balena-cli/issues/867 has been solved, - // we are free to (and definitely should) remove the below line - process.removeAllListeners('SIGINT'); - process.on('SIGINT', () => { - process.stderr.write('Received SIGINT, cleaning up. Please wait.\n'); - cancelBuildIfNecessary(build).then(() => { - stream.end(); - process.exit(130); - }); - }); - - stream.on('data', getBuilderMessageHandler(build)); - stream.on('end', resolve); - stream.on('error', reject); - }).then(() => { - globalLogger.outputDeferredMessages(); - if (build.hadError) { - throw new RemoteBuildFailedError(); - } - }); + return awaitRemoteBuildStream(build, stream); } // We're running a headless build, which means we'll @@ -166,6 +141,42 @@ export async function startRemoteBuild(build: RemoteBuild): Promise { handleHeadlessBuildMessage(result); } +async function awaitRemoteBuildStream( + build: RemoteBuild, + stream: NodeJS.ReadWriteStream, +) { + let sigintHandler: (() => Promise) | null = null; + try { + await new Promise((resolve, reject) => { + // Setup interrupt handlers so we can cancel the build if the user presses + // ctrl+c + sigintHandler = async () => { + process.exitCode = 130; + console.error('Received SIGINT, cleaning up. Please wait.'); + try { + await cancelBuildIfNecessary(build); + } catch (err) { + console.error(err.message); + } finally { + stream.end(); + } + }; + process.once('SIGINT', sigintHandler); + stream.on('data', getBuilderMessageHandler(build)); + stream.on('end', resolve); + stream.on('error', reject); + }); + } finally { + if (sigintHandler) { + process.removeListener('SIGINT', sigintHandler); + } + globalLogger.outputDeferredMessages(); + } + if (build.hadError) { + throw new RemoteBuildFailedError(); + } +} + function handleHeadlessBuildMessage(message: HeadlessBuilderMessage) { if (!process.stdout.isTTY) { process.stdout.write(JSON.stringify(message)); diff --git a/patches/all/exit-hook+1.1.1.patch b/patches/all/exit-hook+1.1.1.patch new file mode 100644 index 00000000..72745300 --- /dev/null +++ b/patches/all/exit-hook+1.1.1.patch @@ -0,0 +1,17 @@ +diff --git a/node_modules/exit-hook/index.js b/node_modules/exit-hook/index.js +index e18013f..3366356 100644 +--- a/node_modules/exit-hook/index.js ++++ b/node_modules/exit-hook/index.js +@@ -14,9 +14,9 @@ function exit(exit, signal) { + el(); + }); + +- if (exit === true) { +- process.exit(128 + signal); +- } ++ // if (exit === true) { ++ // process.exit(128 + signal); ++ // } + }; + + module.exports = function (cb) { From 54731c2d204803fbf87c1a5c57989762071ae359 Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Sat, 7 Nov 2020 22:28:14 +0000 Subject: [PATCH 3/4] Livepush, logs: Automatically reconnect on 'Connection to device lost' Change-type: minor --- doc/cli.markdown | 5 ++ lib/commands/logs.ts | 22 ++++++--- lib/utils/compose.js | 14 +++--- lib/utils/device/deploy.ts | 68 +++++++++++++------------- lib/utils/device/live.ts | 1 + lib/utils/device/logs.ts | 96 ++++++++++++++++++++++++++++++++----- lib/utils/helpers.ts | 61 +++++++++++++++-------- tests/commands/logs.spec.ts | 21 ++++++-- 8 files changed, 205 insertions(+), 83 deletions(-) diff --git a/doc/cli.markdown b/doc/cli.markdown index df6b7fbf..cb7d91db 100644 --- a/doc/cli.markdown +++ b/doc/cli.markdown @@ -1506,6 +1506,11 @@ device UUID, IP, or .local address ### Options +#### --max-retry MAX-RETRY + +Maximum number of reconnection attempts on "connection lost" errors +(use 0 to disable auto reconnection). + #### -t, --tail continuously stream output diff --git a/lib/commands/logs.ts b/lib/commands/logs.ts index 205b650f..1a083ae5 100644 --- a/lib/commands/logs.ts +++ b/lib/commands/logs.ts @@ -23,6 +23,7 @@ import { LogMessage } from 'balena-sdk'; import { IArg } from '@oclif/parser/lib/args'; interface FlagsDef { + 'max-retry'?: number; tail?: boolean; service?: string[]; system?: boolean; @@ -33,6 +34,8 @@ interface ArgsDef { device: string; } +const MAX_RETRY = 1000; + export default class LogsCmd extends Command { public static description = stripIndent` Show device logs. @@ -75,6 +78,11 @@ export default class LogsCmd extends Command { public static usage = 'logs '; public static flags: flags.Input = { + 'max-retry': flags.integer({ + description: stripIndent` + Maximum number of reconnection attempts on "connection lost" errors + (use 0 to disable auto reconnection).`, + }), tail: flags.boolean({ default: false, description: 'continuously stream output', @@ -105,7 +113,7 @@ export default class LogsCmd extends Command { const balena = getBalenaSdk(); const { serviceIdToName } = await import('../utils/cloud'); - const { displayDeviceLogs, displayLogObject } = await import( + const { connectAndDisplayDeviceLogs, displayLogObject } = await import( '../utils/device/logs' ); const { validateIPAddress, validateDotLocalUrl } = await import( @@ -153,13 +161,13 @@ export default class LogsCmd extends Command { } logger.logDebug('Streaming logs'); - const logStream = await deviceApi.getLogStream(); - await displayDeviceLogs( - logStream, + await connectAndDisplayDeviceLogs({ + deviceApi, logger, - options.system || false, - options.service, - ); + system: options.system || false, + filterServices: options.service, + maxAttempts: 1 + (options['max-retry'] ?? MAX_RETRY), + }); } else { // Logs from cloud await Command.checkLoggedIn(); diff --git a/lib/utils/compose.js b/lib/utils/compose.js index a11b867e..3784db5e 100644 --- a/lib/utils/compose.js +++ b/lib/utils/compose.js @@ -366,15 +366,15 @@ export const pushAndUpdateServiceImages = function ( images.map(({ serviceImage, localImage, props, logs }, index) => Promise.all([ localImage.inspect().then((img) => img.Size), - retry( + retry({ // @ts-ignore - () => progress.push(localImage.name, reporters[index], opts), - 3, // `times` - retry 3 times + func: () => progress.push(localImage.name, reporters[index], opts), + maxAttempts: 3, // try calling func 3 times (max) // @ts-ignore - localImage.name, // `label` included in retry log messages - 2000, // `delayMs` - wait 2 seconds before the 1st retry - 1.4, // `backoffScaler` - wait multiplier for each retry - ).finally(renderer.end), + label: localImage.name, // label for retry log messages + initialDelayMs: 2000, // wait 2 seconds before the 1st retry + backoffScaler: 1.4, // wait multiplier for each retry + }).finally(renderer.end), ]) .then( /** @type {([number, string]) => void} */ diff --git a/lib/utils/device/deploy.ts b/lib/utils/device/deploy.ts index 3a556374..f057f9f5 100644 --- a/lib/utils/device/deploy.ts +++ b/lib/utils/device/deploy.ts @@ -123,7 +123,6 @@ async function environmentFromInput( export async function deployToDevice(opts: DeviceDeployOptions): Promise { const { exitWithExpectedError } = await import('../../errors'); - const { displayDeviceLogs } = await import('./logs'); // Resolve .local addresses to IP to avoid // issue with Windows and rapid repeat lookups. @@ -220,6 +219,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { buildLogs, ); + globalLogger.outputDeferredMessages(); + // Print a newline to clearly separate build time and runtime + console.log(); + const envs = await environmentFromInput( opts.env, Object.getOwnPropertyNames(project.composition.services), @@ -244,10 +247,11 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { // Now that we've set the target state, the device will do it's thing // so we can either just display the logs, or start a livepush session // (whilst also display logs) + const promises: Array> = [streamDeviceLogs(api, opts)]; + let livepush: LivepushManager | null = null; + if (!opts.nolive) { - // Print a newline to clear seperate build time and runtime - console.log(); - const livepush = new LivepushManager({ + livepush = new LivepushManager({ api, buildContext: opts.source, buildTasks, @@ -257,43 +261,37 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { buildLogs: buildLogs!, deployOpts: opts, }); - - const promises: Array> = [livepush.init()]; - // Only show logs if we're not detaching - if (!opts.detached) { - const logStream = await api.getLogStream(); - globalLogger.logInfo('Streaming device logs...'); - promises.push( - displayDeviceLogs(logStream, globalLogger, opts.system, opts.services), - ); - } else { + promises.push(livepush.init()); + if (opts.detached) { globalLogger.logLivepush( 'Running in detached mode, no service logs will be shown', ); } globalLogger.logLivepush('Watching for file changes...'); - globalLogger.outputDeferredMessages(); - await Promise.all(promises); - - livepush.close(); - } else { - if (opts.detached) { - return; - } - // Print an empty newline to separate the build output - // from the device output - console.log(); - // Now all we need to do is stream back the logs - const logStream = await api.getLogStream(); - globalLogger.logInfo('Streaming device logs...'); - globalLogger.outputDeferredMessages(); - await displayDeviceLogs( - logStream, - globalLogger, - opts.system, - opts.services, - ); } + await Promise.all(promises).finally(() => { + // Stop watching files after log streaming ends (e.g. on SIGINT) + livepush?.close(); + }); +} + +async function streamDeviceLogs( + deviceApi: DeviceAPI, + opts: DeviceDeployOptions, +) { + // Only show logs if we're not detaching + if (opts.detached) { + return; + } + globalLogger.logInfo('Streaming device logs...'); + const { connectAndDisplayDeviceLogs } = await import('./logs'); + return connectAndDisplayDeviceLogs({ + deviceApi, + logger: globalLogger, + system: opts.system || false, + filterServices: opts.services, + maxAttempts: 1001, + }); } function connectToDocker(host: string, port: number): Docker { diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index a310320d..7ede20c4 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -283,6 +283,7 @@ export class LivepushManager { return monitor; } + /** Stop the filesystem watcher, allowing the Node process to exit gracefully */ public close() { for (const container of Object.values(this.containers)) { container.monitor.close().catch((err) => { diff --git a/lib/utils/device/logs.ts b/lib/utils/device/logs.ts index 828551ed..ef7b00f4 100644 --- a/lib/utils/device/logs.ts +++ b/lib/utils/device/logs.ts @@ -1,9 +1,33 @@ +/** + * @license + * Copyright 2018-2020 Balena Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import ColorHash = require('color-hash'); import * as _ from 'lodash'; import type { Readable } from 'stream'; import { getChalk } from '../lazy'; import Logger = require('../logger'); +import { ExpectedError } from '../../errors'; + +class DeviceConnectionLostError extends ExpectedError { + public static defaultMsg = 'Connection to device lost'; + constructor(msg?: string) { + super(msg || DeviceConnectionLostError.defaultMsg); + } +} interface Log { message: string; @@ -32,24 +56,74 @@ interface BuildLog { * @param filterService Filter the logs so that only logs * from a single service will be displayed */ -export function displayDeviceLogs( +async function displayDeviceLogs( logs: Readable, logger: Logger, system: boolean, filterServices?: string[], ): Promise { - return new Promise((resolve, reject) => { - logs.on('data', (log) => { - displayLogLine(log, logger, system, filterServices); + let gotSignal = false; + const handleSignal = () => { + gotSignal = true; + logs.emit('close'); + }; + process.once('SIGINT', handleSignal); + process.once('SIGTERM', handleSignal); + try { + await new Promise((resolve, reject) => { + logs.on('data', (log) => { + displayLogLine(log, logger, system, filterServices); + }); + logs.once('error', reject); + logs.once('end', () => { + logger.logWarn(DeviceConnectionLostError.defaultMsg); + if (gotSignal) { + resolve(); + } else { + reject(new DeviceConnectionLostError()); + } + }); }); - logs.once('error', reject); - logs.once('end', () => { - logger.logWarn('Connection to device lost'); - resolve(); + } finally { + process.removeListener('SIGINT', handleSignal); + process.removeListener('SIGTERM', handleSignal); + } +} + +export async function connectAndDisplayDeviceLogs({ + deviceApi, + logger, + system, + filterServices, + maxAttempts = 3, +}: { + deviceApi: import('./api').DeviceAPI; + logger: Logger; + system: boolean; + filterServices?: string[]; + maxAttempts?: number; +}) { + async function connectAndDisplay() { + // Open a new connection to the device's supervisor, TCP port 48484 + const logStream = await deviceApi.getLogStream(); + return displayDeviceLogs(logStream, logger, system, filterServices); + } + + const { retry } = await import('../../utils/helpers'); + try { + await retry({ + func: connectAndDisplay, + maxAttempts, + label: 'Streaming logs', }); - process.once('SIGINT', () => logs.emit('close')); - process.once('SIGTERM', () => logs.emit('close')); - }); + } catch (err) { + if (err instanceof DeviceConnectionLostError) { + err.message = `Max retry count (${ + maxAttempts - 1 + }) exceeded while attempting to reconnect to the device`; + } + throw err; + } } export function displayBuildLog(log: BuildLog, logger: Logger): void { diff --git a/lib/utils/helpers.ts b/lib/utils/helpers.ts index 5f652f52..5bdaa873 100644 --- a/lib/utils/helpers.ts +++ b/lib/utils/helpers.ts @@ -204,39 +204,62 @@ function getApplication( ) as Promise; } +const second = 1000; // 1000 milliseconds +const minute = 60 * second; export const delay = promisify(setTimeout); /** * Call `func`, and if func() throws an error or returns a promise that - * eventually rejects, retry it `times` many times, each time printing a - * log message including the given `label` and the error that led to - * retrying. Wait delayMs before the first retry, multiplying the wait - * by backoffScaler for each further attempt. + * eventually rejects, retry it `times` many times, each time printing a log + * message including the given `label` and the error that led to retrying. + * Wait initialDelayMs before the first retry. Before each further retry, + * the delay is reduced by the time elapsed since the last retry, and + * increased by multiplying the result by backoffScaler. * @param func: The function to call and, if needed, retry calling - * @param times: How many times to retry calling func() + * @param maxAttempts: How many times (max) to try calling func(). + * func() will always be called at least once. * @param label: Label to include in the retry log message - * @param startingDelayMs: How long to wait before the first retry + * @param initialDelayMs: How long to wait before the first retry * @param backoffScaler: Multiplier to previous wait time - * @param count: Used "internally" for the recursive calls + * @param maxSingleDelayMs: Maximum interval between retries */ -export async function retry( - func: () => T, - times: number, - label: string, - startingDelayMs = 1000, +export async function retry({ + func, + maxAttempts, + label, + initialDelayMs = 1000, backoffScaler = 2, -): Promise { - for (let count = 0; count < times - 1; count++) { + maxSingleDelayMs = 1 * minute, +}: { + func: () => T; + maxAttempts: number; + label: string; + initialDelayMs?: number; + backoffScaler?: number; + maxSingleDelayMs?: number; +}): Promise { + let delayMs = initialDelayMs; + for (let count = 0; count < maxAttempts - 1; count++) { + const lastAttemptMs = Date.now(); try { return await func(); } catch (err) { - const delayMS = backoffScaler ** count * startingDelayMs; + if (count) { + // use Math.max to work around system time changes, e.g. DST + const elapsedMs = Math.max(0, Date.now() - lastAttemptMs); + // reduce delayMs by the time elapsed since the last attempt + delayMs = Math.max(initialDelayMs, delayMs - elapsedMs); + // increase delayMs by the backoffScaler factor + delayMs = Math.min(maxSingleDelayMs, delayMs * backoffScaler); + } + const sec = delayMs / 1000; + const secStr = sec < 10 ? sec.toFixed(1) : Math.round(sec).toString(); console.log( - `Retrying "${label}" after ${(delayMS / 1000).toFixed(2)}s (${ - count + 1 - } of ${times}) due to: ${err}`, + `Retrying "${label}" after ${secStr}s (${count + 1} of ${ + maxAttempts - 1 + }) due to: ${err}`, ); - await delay(delayMS); + await delay(delayMs); } } return await func(); diff --git a/tests/commands/logs.spec.ts b/tests/commands/logs.spec.ts index 49148920..08e0da43 100644 --- a/tests/commands/logs.spec.ts +++ b/tests/commands/logs.spec.ts @@ -46,20 +46,33 @@ describe('balena logs', function () { itS('should reach the expected endpoints on a local device', async () => { supervisor.expectGetPing(); supervisor.expectGetLogs(); + supervisor.expectGetLogs(); - const { err, out } = await runCommand('logs 1.2.3.4'); + const { err, out } = await runCommand('logs 1.2.3.4 --max-retry 1'); - expect(err).to.be.empty; + const errLines = cleanOutput(err, true); + const errMsg = + 'Max retry count (1) exceeded while attempting to reconnect to the device'; + if (process.env.DEBUG) { + expect(errLines).to.include(errMsg); + } else { + expect(errLines).to.have.members([errMsg]); + } const removeTimestamps = (logLine: string) => logLine.replace(/(?<=\[Logs\]) \[.+?\]/, ''); const cleanedOut = cleanOutput(out, true).map((l) => removeTimestamps(l)); - expect(cleanedOut).to.deep.equal([ + expect(cleanedOut).to.have.members([ '[Logs] Streaming logs', '[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', '[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', - '[Error] Connection to device lost', + '[Warn] Connection to device lost', + 'Retrying "Streaming logs" after 1.0s (1 of 1) due to: DeviceConnectionLostError: Connection to device lost', + '[Logs] Streaming logs', + '[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', + '[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux', + '[Warn] Connection to device lost', ]); }); }); From 07666e953fd9fb0e773e9158ec9068516c1233ef Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Thu, 3 Dec 2020 20:50:00 +0000 Subject: [PATCH 4/4] Livepush: Extend CTRL-C availability (don't ignore CTRL-C during image build) Change-type: patch --- lib/errors.ts | 15 +++- lib/utils/device/deploy.ts | 35 ++++---- lib/utils/device/errors.ts | 23 ++++- lib/utils/device/live.ts | 20 ++--- lib/utils/device/logs.ts | 15 +++- lib/utils/helpers.ts | 64 +++++++++++++- lib/utils/remote-build.ts | 174 ++++++++++++++++++------------------- 7 files changed, 217 insertions(+), 129 deletions(-) diff --git a/lib/errors.ts b/lib/errors.ts index d2dd8130..d89a66b4 100644 --- a/lib/errors.ts +++ b/lib/errors.ts @@ -21,7 +21,10 @@ import { TypedError } from 'typed-error'; import { getChalk, stripIndent } from './utils/lazy'; import { getHelp } from './utils/messages'; -export class ExpectedError extends TypedError {} +export class ExpectedError extends TypedError { + public code?: string; + public exitCode?: number; +} export class NotLoggedInError extends ExpectedError {} @@ -39,6 +42,8 @@ export class NoPortsDefinedError extends ExpectedError { } } +export class SIGINTError extends ExpectedError {} + /** * instanceOf is a more reliable implementation of the plain `instanceof` * typescript operator, for use with TypedError errors when the error @@ -177,7 +182,7 @@ async function sentryCaptureException(error: Error) { await Sentry.close(1000); } catch (e) { if (process.env.DEBUG) { - console.error('Timeout reporting error to sentry.io'); + console.error('[debug] Timeout reporting error to sentry.io'); } } } @@ -212,8 +217,9 @@ export async function handleError(error: Error) { if (!process.env.BALENARC_NO_SENTRY) { await sentryCaptureException(error); } - - // Unhandled/unexpected error: ensure that the process terminates. + } + if (error instanceof SIGINTError || !isExpectedError) { + // SIGINT or unexpected error: ensure that the process terminates. // The exit error code was set above through `process.exitCode`. process.exit(); } @@ -247,6 +253,7 @@ export const printExpectedErrorMessage = function (message: string) { * them and call this function. * * DEPRECATED: Use `throw new ExpectedError()` instead. + * If a specific process exit code x must be set, use process.exitCode = x */ export function exitWithExpectedError(message: string | Error): never { if (message instanceof Error) { diff --git a/lib/utils/device/deploy.ts b/lib/utils/device/deploy.ts index f057f9f5..c0d374bf 100644 --- a/lib/utils/device/deploy.ts +++ b/lib/utils/device/deploy.ts @@ -28,6 +28,7 @@ import { import type { Readable } from 'stream'; import { BALENA_ENGINE_TMP_PATH } from '../../config'; +import { ExpectedError } from '../../errors'; import { checkBuildSecretsRequirements, loadProject, @@ -37,7 +38,6 @@ import { import Logger = require('../logger'); import { DeviceAPI, DeviceInfo } from './api'; import * as LocalPushErrors from './errors'; -import { DeviceAPIError } from './errors'; import LivepushManager from './live'; import { displayBuildLog } from './logs'; @@ -76,7 +76,6 @@ async function environmentFromInput( serviceNames: string[], logger: Logger, ): Promise { - const { exitWithExpectedError } = await import('../../errors'); // A normal environment variable regex, with an added part // to find a colon followed servicename at the start const varRegex = /^(?:([^\s:]+):)?([^\s]+?)=(.*)$/; @@ -92,7 +91,7 @@ async function environmentFromInput( for (const env of envs) { const maybeMatch = env.match(varRegex); if (maybeMatch == null) { - exitWithExpectedError(`Unable to parse environment variable: ${env}`); + throw new ExpectedError(`Unable to parse environment variable: ${env}`); } const match = maybeMatch!; let service: string | undefined; @@ -122,8 +121,6 @@ async function environmentFromInput( } export async function deployToDevice(opts: DeviceDeployOptions): Promise { - const { exitWithExpectedError } = await import('../../errors'); - // Resolve .local addresses to IP to avoid // issue with Windows and rapid repeat lookups. // see: https://github.com/balena-io/balena-cli/issues/1518 @@ -143,7 +140,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { globalLogger.logDebug('Checking we can access device'); await api.ping(); } catch (e) { - exitWithExpectedError( + throw new ExpectedError( `Could not communicate with local mode device at address ${opts.deviceHost}`, ); } @@ -157,7 +154,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { const version = await api.getVersion(); globalLogger.logDebug(`Checking device version: ${version}`); if (!semver.satisfies(version, '>=7.21.4')) { - exitWithExpectedError(versionError); + throw new ExpectedError(versionError); } if (!opts.nolive && !semver.satisfies(version, '>=9.7.0')) { globalLogger.logWarn( @@ -168,8 +165,8 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { } catch (e) { // Very old supervisor versions do not support /version endpoint // a DeviceAPIError is expected in this case - if (e instanceof DeviceAPIError) { - exitWithExpectedError(versionError); + if (e instanceof LocalPushErrors.DeviceAPIError) { + throw new ExpectedError(versionError); } else { throw e; } @@ -209,7 +206,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { if (!opts.nolive) { buildLogs = {}; } - const buildTasks = await performBuilds( + + const { awaitInterruptibleTask } = await import('../helpers'); + const buildTasks = await awaitInterruptibleTask( + performBuilds, project.composition, tarStream, docker, @@ -269,10 +269,13 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { } globalLogger.logLivepush('Watching for file changes...'); } - await Promise.all(promises).finally(() => { + try { + await awaitInterruptibleTask(() => Promise.all(promises)); + } finally { // Stop watching files after log streaming ends (e.g. on SIGINT) livepush?.close(); - }); + await livepush?.cleanup(); + } } async function streamDeviceLogs( @@ -441,7 +444,9 @@ export async function rebuildSingleTask( ); if (task == null) { - throw new Error(`Could not find build task for service ${serviceName}`); + throw new ExpectedError( + `Could not find build task for service ${serviceName}`, + ); } await assignDockerBuildOpts(docker, [task], opts); @@ -610,8 +615,6 @@ export function generateTargetState( } async function inspectBuildResults(images: LocalImage[]): Promise { - const { exitWithExpectedError } = await import('../../errors'); - const failures: LocalPushErrors.BuildFailure[] = []; _.each(images, (image) => { @@ -624,6 +627,6 @@ async function inspectBuildResults(images: LocalImage[]): Promise { }); if (failures.length > 0) { - exitWithExpectedError(new LocalPushErrors.BuildError(failures).toString()); + throw new LocalPushErrors.BuildError(failures).toString(); } } diff --git a/lib/utils/device/errors.ts b/lib/utils/device/errors.ts index 082eed91..5455a803 100644 --- a/lib/utils/device/errors.ts +++ b/lib/utils/device/errors.ts @@ -1,12 +1,29 @@ +/** + * @license + * Copyright 2018-2020 Balena Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import * as _ from 'lodash'; -import { TypedError } from 'typed-error'; + +import { ExpectedError } from '../../errors'; export interface BuildFailure { error: Error; serviceName: string; } -export class BuildError extends TypedError { +export class BuildError extends ExpectedError { private failures: BuildFailure[]; public constructor(failures: BuildFailure[]) { @@ -33,7 +50,7 @@ export class BuildError extends TypedError { } } -export class DeviceAPIError extends TypedError {} +export class DeviceAPIError extends ExpectedError {} export class BadRequestDeviceAPIError extends DeviceAPIError {} export class ServiceUnavailableAPIError extends DeviceAPIError {} diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index 7ede20c4..23463d6c 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -219,17 +219,17 @@ export class LivepushManager { this.rebuildsCancelled[serviceName] = false; } } + } - // Setup cleanup handlers for the device - process.once('SIGINT', async () => { - this.logger.logLivepush('Cleaning up device...'); - await Promise.all( - _.map(this.containers, (container) => { - container.livepush.cleanupIntermediateContainers(); - }), - ); - this.logger.logDebug('Cleaning up done.'); - }); + /** Delete intermediate build containers from the device */ + public async cleanup() { + this.logger.logLivepush('Cleaning up device...'); + await Promise.all( + _.map(this.containers, (container) => + container.livepush.cleanupIntermediateContainers(), + ), + ); + this.logger.logDebug('Cleaning up done.'); } protected setupFilesystemWatcher( diff --git a/lib/utils/device/logs.ts b/lib/utils/device/logs.ts index ef7b00f4..1181564f 100644 --- a/lib/utils/device/logs.ts +++ b/lib/utils/device/logs.ts @@ -20,7 +20,7 @@ import type { Readable } from 'stream'; import { getChalk } from '../lazy'; import Logger = require('../logger'); -import { ExpectedError } from '../../errors'; +import { ExpectedError, SIGINTError } from '../../errors'; class DeviceConnectionLostError extends ExpectedError { public static defaultMsg = 'Connection to device lost'; @@ -62,15 +62,16 @@ async function displayDeviceLogs( system: boolean, filterServices?: string[], ): Promise { + const { addSIGINTHandler } = await import('../helpers'); let gotSignal = false; const handleSignal = () => { gotSignal = true; logs.emit('close'); }; - process.once('SIGINT', handleSignal); + addSIGINTHandler(handleSignal); process.once('SIGTERM', handleSignal); try { - await new Promise((resolve, reject) => { + await new Promise((_resolve, reject) => { logs.on('data', (log) => { displayLogLine(log, logger, system, filterServices); }); @@ -78,7 +79,7 @@ async function displayDeviceLogs( logs.once('end', () => { logger.logWarn(DeviceConnectionLostError.defaultMsg); if (gotSignal) { - resolve(); + reject(new SIGINTError('Log streaming aborted on SIGINT signal')); } else { reject(new DeviceConnectionLostError()); } @@ -90,6 +91,12 @@ async function displayDeviceLogs( } } +/** + * Open a TCP connection to the device's supervisor (TCP port 48484) and tail + * (display) device logs. Retry (reconnect) up to maxAttempts times if the + * TCP connection drops. Don't retry on SIGINT (CTRL-C). + * See function `displayDeviceLogs` for parameter documentation. + */ export async function connectAndDisplayDeviceLogs({ deviceApi, logger, diff --git a/lib/utils/helpers.ts b/lib/utils/helpers.ts index 5bdaa873..95e819b9 100644 --- a/lib/utils/helpers.ts +++ b/lib/utils/helpers.ts @@ -22,7 +22,7 @@ import * as os from 'os'; import type * as ShellEscape from 'shell-escape'; import type { Device, PineOptions } from 'balena-sdk'; -import { ExpectedError } from '../errors'; +import { ExpectedError, SIGINTError } from '../errors'; import { getBalenaSdk, getChalk, getVisuals } from './lazy'; import { promisify } from 'util'; import { isSubcommand } from '../preparser'; @@ -244,6 +244,10 @@ export async function retry({ try { return await func(); } catch (err) { + // Don't retry on SIGINT (CTRL-C) + if (err instanceof SIGINTError) { + throw err; + } if (count) { // use Math.max to work around system time changes, e.g. DST const elapsedMs = Math.max(0, Date.now() - lastAttemptMs); @@ -513,3 +517,61 @@ export const expandForAppName: PineOptions = { is_running__release: { $select: 'commit' }, }, }; + +/** + * Use the `readline` library on Windows to install SIGINT handlers. + * This appears to be necessary on MSYS / Git for Windows, and also useful + * with PowerShell to avoid the built-in "Terminate batch job? (Y/N)" prompt + * that appears to result in ungraceful / abrupt process termination. + */ +const installReadlineSigintEmitter = _.once(function emitSigint() { + if (process.platform === 'win32') { + const readline = require('readline') as typeof import('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + }); + rl.on('SIGINT', () => process.emit('SIGINT' as any)); + } +}); + +/** + * Centralized cross-platform logic to install a SIGINT handler + * @param sigintHandler The handler function + * @param once Whether the handler should be called no more than once + */ +export function addSIGINTHandler(sigintHandler: () => void, once = true) { + installReadlineSigintEmitter(); + if (once) { + process.once('SIGINT', sigintHandler); + } else { + process.on('SIGINT', sigintHandler); + } +} + +/** + * Call the given task function (which returns a promise) with the given + * arguments, await the returned promise and resolve to the same result. + * While awaiting for that promise, also await for a SIGINT signal (if any), + * with a new SIGINT handler that is automatically removed on return. + * If a SIGINT signal is received while awaiting for the task function, + * immediately return a promise that rejects with SIGINTError. + * @param task An async function to be executed and awaited + * @param theArgs Arguments to be passed to the task function + */ +export async function awaitInterruptibleTask< + T extends (...args: any[]) => Promise +>(task: T, ...theArgs: Parameters): Promise> { + let sigintHandler: () => void = () => undefined; + const sigintPromise = new Promise((_resolve, reject) => { + sigintHandler = () => { + reject(new SIGINTError('Task aborted on SIGINT signal')); + }; + addSIGINTHandler(sigintHandler); + }); + try { + return await Promise.race([sigintPromise, task(...theArgs)]); + } finally { + process.removeListener('SIGINT', sigintHandler); + } +} diff --git a/lib/utils/remote-build.ts b/lib/utils/remote-build.ts index 39951f14..2bf2403a 100644 --- a/lib/utils/remote-build.ts +++ b/lib/utils/remote-build.ts @@ -22,8 +22,7 @@ import type * as Stream from 'stream'; import streamToPromise = require('stream-to-promise'); import type { Pack } from 'tar-stream'; -import { ExpectedError } from '../errors'; -import { exitWithExpectedError } from '../errors'; +import { ExpectedError, SIGINTError } from '../errors'; import { tarDirectory } from './compose_ts'; import { getVisuals, stripIndent } from './lazy'; import Logger = require('./logger'); @@ -110,79 +109,76 @@ async function getBuilderEndpoint( } export async function startRemoteBuild(build: RemoteBuild): Promise { - const stream = await getRemoteBuildStream(build); + const [buildRequest, stream] = await getRemoteBuildStream(build); - // Special windows handling (win64 also reports win32) - if (process.platform === 'win32') { - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, - }); + // Setup CTRL-C handler so the user can interrupt the build + let cancellationPromise = Promise.resolve(); + const sigintHandler = () => { + process.exitCode = 130; + console.error('\nReceived SIGINT, cleaning up. Please wait.'); + try { + cancellationPromise = cancelBuildIfNecessary(build); + } catch (err) { + console.error(err.message); + } finally { + buildRequest.abort(); + const sigintErr = new SIGINTError('Build aborted on SIGINT signal'); + sigintErr.code = 'SIGINT'; + stream.emit('error', sigintErr); + } + }; - rl.on('SIGINT', () => process.emit('SIGINT' as any)); - } + const { addSIGINTHandler } = await import('./helpers'); + addSIGINTHandler(sigintHandler); - if (!build.opts.headless) { - return awaitRemoteBuildStream(build, stream); - } - - // We're running a headless build, which means we'll - // get a single object back, detailing if the build has - // been started - let result: HeadlessBuilderMessage; try { - const response = await streamToPromise(stream); - result = JSON.parse(response.toString()); - } catch (e) { - throw new Error( - `There was an error reading the response from the remote builder: ${e}`, - ); + if (build.opts.headless) { + await handleHeadlessBuildStream(stream); + } else { + await handleRemoteBuildStream(build, stream); + } + } finally { + process.removeListener('SIGINT', sigintHandler); + globalLogger.outputDeferredMessages(); + await cancellationPromise; } - handleHeadlessBuildMessage(result); } -async function awaitRemoteBuildStream( +async function handleRemoteBuildStream( build: RemoteBuild, - stream: NodeJS.ReadWriteStream, + stream: Stream.Stream, ) { - let sigintHandler: (() => Promise) | null = null; - try { - await new Promise((resolve, reject) => { - // Setup interrupt handlers so we can cancel the build if the user presses - // ctrl+c - sigintHandler = async () => { - process.exitCode = 130; - console.error('Received SIGINT, cleaning up. Please wait.'); - try { - await cancelBuildIfNecessary(build); - } catch (err) { - console.error(err.message); - } finally { - stream.end(); - } - }; - process.once('SIGINT', sigintHandler); - stream.on('data', getBuilderMessageHandler(build)); - stream.on('end', resolve); - stream.on('error', reject); - }); - } finally { - if (sigintHandler) { - process.removeListener('SIGINT', sigintHandler); - } - globalLogger.outputDeferredMessages(); - } + await new Promise((resolve, reject) => { + const msgHandler = getBuilderMessageHandler(build); + stream.on('data', msgHandler); + stream.once('end', resolve); + stream.once('error', reject); + }); if (build.hadError) { throw new RemoteBuildFailedError(); } } -function handleHeadlessBuildMessage(message: HeadlessBuilderMessage) { +async function handleHeadlessBuildStream(stream: Stream.Stream) { + // We're running a headless build, which means we'll + // get a single object back, detailing if the build has + // been started + let message: HeadlessBuilderMessage; + try { + const response = await streamToPromise(stream as NodeJS.ReadWriteStream); + message = JSON.parse(response.toString()); + } catch (e) { + if (e.code === 'SIGINT') { + throw e; + } + throw new Error( + `There was an error reading the response from the remote builder: ${e}`, + ); + } if (!process.stdout.isTTY) { process.stdout.write(JSON.stringify(message)); return; } - if (message.started) { console.log('Build successfully started'); console.log(` Release ID: ${message.releaseId!}`); @@ -266,6 +262,9 @@ function getBuilderMessageHandler( async function cancelBuildIfNecessary(build: RemoteBuild): Promise { if (build.releaseId != null) { + console.error( + `Setting 'cancelled' release status for release ID ${build.releaseId} ...`, + ); await build.sdk.pine.patch({ resource: 'release', id: build.releaseId, @@ -352,7 +351,7 @@ function createRemoteBuildRequest( headers: { 'Content-Encoding': 'gzip' }, body: tarStream.pipe(zlib.createGzip({ level: 6 })), }) - .on('error', onError) + .once('error', onError) // `.once` because the handler re-emits .once('response', (response: request.RequestResponse) => { if (response.statusCode >= 100 && response.statusCode < 400) { if (DEBUG_MODE) { @@ -368,28 +367,31 @@ function createRemoteBuildRequest( if (response.body) { msgArr.push(response.body); } - onError(new Error(msgArr.join('\n'))); + onError(new ExpectedError(msgArr.join('\n'))); } }); } async function getRemoteBuildStream( build: RemoteBuild, -): Promise { +): Promise<[request.Request, Stream.Stream]> { const builderUrl = await getBuilderEndpoint( build.baseUrl, build.owner, build.app, build.opts, ); - + let stream: Stream.Stream; let uploadSpinner = { stop: () => { /* noop */ }, }; - let exitOnError = (error: Error) => { - return exitWithExpectedError(error); + const onError = (error: Error) => { + uploadSpinner.stop(); + if (stream) { + stream.emit('error', error); + } }; // We only show the spinner when outputting to a tty if (process.stdout.isTTY) { @@ -397,36 +399,26 @@ async function getRemoteBuildStream( uploadSpinner = new visuals.Spinner( 'Uploading source package to balenaCloud', ); - exitOnError = (error: Error): never => { - uploadSpinner.stop(); - return exitWithExpectedError(error); - }; - // This is not strongly typed to start with, so we cast - // to any to allow the method call (uploadSpinner as any).start(); } - try { - const tarStream = await getTarStream(build); - const buildRequest = createRemoteBuildRequest( - build, - tarStream, - builderUrl, - exitOnError, - ); - let stream: NodeJS.ReadWriteStream; - if (build.opts.headless) { - stream = (buildRequest as unknown) as NodeJS.ReadWriteStream; - } else { - stream = buildRequest.pipe(JSONStream.parse('*')); - } - return stream - .once('close', () => uploadSpinner.stop()) - .once('data', () => uploadSpinner.stop()) - .once('end', () => uploadSpinner.stop()) - .once('error', () => uploadSpinner.stop()) - .once('finish', () => uploadSpinner.stop()); - } catch (error) { - return exitOnError(error); + const tarStream = await getTarStream(build); + const buildRequest = createRemoteBuildRequest( + build, + tarStream, + builderUrl, + onError, + ); + if (build.opts.headless) { + stream = buildRequest; + } else { + stream = buildRequest.pipe(JSONStream.parse('*')); } + stream = stream + .once('error', () => uploadSpinner.stop()) + .once('close', () => uploadSpinner.stop()) + .once('data', () => uploadSpinner.stop()) + .once('end', () => uploadSpinner.stop()) + .once('finish', () => uploadSpinner.stop()); + return [buildRequest, stream]; }