From 07666e953fd9fb0e773e9158ec9068516c1233ef Mon Sep 17 00:00:00 2001 From: Paulo Castro Date: Thu, 3 Dec 2020 20:50:00 +0000 Subject: [PATCH] Livepush: Extend CTRL-C availability (don't ignore CTRL-C during image build) Change-type: patch --- lib/errors.ts | 15 +++- lib/utils/device/deploy.ts | 35 ++++---- lib/utils/device/errors.ts | 23 ++++- lib/utils/device/live.ts | 20 ++--- lib/utils/device/logs.ts | 15 +++- lib/utils/helpers.ts | 64 +++++++++++++- lib/utils/remote-build.ts | 174 ++++++++++++++++++------------------- 7 files changed, 217 insertions(+), 129 deletions(-) diff --git a/lib/errors.ts b/lib/errors.ts index d2dd8130..d89a66b4 100644 --- a/lib/errors.ts +++ b/lib/errors.ts @@ -21,7 +21,10 @@ import { TypedError } from 'typed-error'; import { getChalk, stripIndent } from './utils/lazy'; import { getHelp } from './utils/messages'; -export class ExpectedError extends TypedError {} +export class ExpectedError extends TypedError { + public code?: string; + public exitCode?: number; +} export class NotLoggedInError extends ExpectedError {} @@ -39,6 +42,8 @@ export class NoPortsDefinedError extends ExpectedError { } } +export class SIGINTError extends ExpectedError {} + /** * instanceOf is a more reliable implementation of the plain `instanceof` * typescript operator, for use with TypedError errors when the error @@ -177,7 +182,7 @@ async function sentryCaptureException(error: Error) { await Sentry.close(1000); } catch (e) { if (process.env.DEBUG) { - console.error('Timeout reporting error to sentry.io'); + console.error('[debug] Timeout reporting error to sentry.io'); } } } @@ -212,8 +217,9 @@ export async function handleError(error: Error) { if (!process.env.BALENARC_NO_SENTRY) { await sentryCaptureException(error); } - - // Unhandled/unexpected error: ensure that the process terminates. + } + if (error instanceof SIGINTError || !isExpectedError) { + // SIGINT or unexpected error: ensure that the process terminates. // The exit error code was set above through `process.exitCode`. process.exit(); } @@ -247,6 +253,7 @@ export const printExpectedErrorMessage = function (message: string) { * them and call this function. * * DEPRECATED: Use `throw new ExpectedError()` instead. + * If a specific process exit code x must be set, use process.exitCode = x */ export function exitWithExpectedError(message: string | Error): never { if (message instanceof Error) { diff --git a/lib/utils/device/deploy.ts b/lib/utils/device/deploy.ts index f057f9f5..c0d374bf 100644 --- a/lib/utils/device/deploy.ts +++ b/lib/utils/device/deploy.ts @@ -28,6 +28,7 @@ import { import type { Readable } from 'stream'; import { BALENA_ENGINE_TMP_PATH } from '../../config'; +import { ExpectedError } from '../../errors'; import { checkBuildSecretsRequirements, loadProject, @@ -37,7 +38,6 @@ import { import Logger = require('../logger'); import { DeviceAPI, DeviceInfo } from './api'; import * as LocalPushErrors from './errors'; -import { DeviceAPIError } from './errors'; import LivepushManager from './live'; import { displayBuildLog } from './logs'; @@ -76,7 +76,6 @@ async function environmentFromInput( serviceNames: string[], logger: Logger, ): Promise { - const { exitWithExpectedError } = await import('../../errors'); // A normal environment variable regex, with an added part // to find a colon followed servicename at the start const varRegex = /^(?:([^\s:]+):)?([^\s]+?)=(.*)$/; @@ -92,7 +91,7 @@ async function environmentFromInput( for (const env of envs) { const maybeMatch = env.match(varRegex); if (maybeMatch == null) { - exitWithExpectedError(`Unable to parse environment variable: ${env}`); + throw new ExpectedError(`Unable to parse environment variable: ${env}`); } const match = maybeMatch!; let service: string | undefined; @@ -122,8 +121,6 @@ async function environmentFromInput( } export async function deployToDevice(opts: DeviceDeployOptions): Promise { - const { exitWithExpectedError } = await import('../../errors'); - // Resolve .local addresses to IP to avoid // issue with Windows and rapid repeat lookups. // see: https://github.com/balena-io/balena-cli/issues/1518 @@ -143,7 +140,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { globalLogger.logDebug('Checking we can access device'); await api.ping(); } catch (e) { - exitWithExpectedError( + throw new ExpectedError( `Could not communicate with local mode device at address ${opts.deviceHost}`, ); } @@ -157,7 +154,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { const version = await api.getVersion(); globalLogger.logDebug(`Checking device version: ${version}`); if (!semver.satisfies(version, '>=7.21.4')) { - exitWithExpectedError(versionError); + throw new ExpectedError(versionError); } if (!opts.nolive && !semver.satisfies(version, '>=9.7.0')) { globalLogger.logWarn( @@ -168,8 +165,8 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { } catch (e) { // Very old supervisor versions do not support /version endpoint // a DeviceAPIError is expected in this case - if (e instanceof DeviceAPIError) { - exitWithExpectedError(versionError); + if (e instanceof LocalPushErrors.DeviceAPIError) { + throw new ExpectedError(versionError); } else { throw e; } @@ -209,7 +206,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { if (!opts.nolive) { buildLogs = {}; } - const buildTasks = await performBuilds( + + const { awaitInterruptibleTask } = await import('../helpers'); + const buildTasks = await awaitInterruptibleTask( + performBuilds, project.composition, tarStream, docker, @@ -269,10 +269,13 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise { } globalLogger.logLivepush('Watching for file changes...'); } - await Promise.all(promises).finally(() => { + try { + await awaitInterruptibleTask(() => Promise.all(promises)); + } finally { // Stop watching files after log streaming ends (e.g. on SIGINT) livepush?.close(); - }); + await livepush?.cleanup(); + } } async function streamDeviceLogs( @@ -441,7 +444,9 @@ export async function rebuildSingleTask( ); if (task == null) { - throw new Error(`Could not find build task for service ${serviceName}`); + throw new ExpectedError( + `Could not find build task for service ${serviceName}`, + ); } await assignDockerBuildOpts(docker, [task], opts); @@ -610,8 +615,6 @@ export function generateTargetState( } async function inspectBuildResults(images: LocalImage[]): Promise { - const { exitWithExpectedError } = await import('../../errors'); - const failures: LocalPushErrors.BuildFailure[] = []; _.each(images, (image) => { @@ -624,6 +627,6 @@ async function inspectBuildResults(images: LocalImage[]): Promise { }); if (failures.length > 0) { - exitWithExpectedError(new LocalPushErrors.BuildError(failures).toString()); + throw new LocalPushErrors.BuildError(failures).toString(); } } diff --git a/lib/utils/device/errors.ts b/lib/utils/device/errors.ts index 082eed91..5455a803 100644 --- a/lib/utils/device/errors.ts +++ b/lib/utils/device/errors.ts @@ -1,12 +1,29 @@ +/** + * @license + * Copyright 2018-2020 Balena Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import * as _ from 'lodash'; -import { TypedError } from 'typed-error'; + +import { ExpectedError } from '../../errors'; export interface BuildFailure { error: Error; serviceName: string; } -export class BuildError extends TypedError { +export class BuildError extends ExpectedError { private failures: BuildFailure[]; public constructor(failures: BuildFailure[]) { @@ -33,7 +50,7 @@ export class BuildError extends TypedError { } } -export class DeviceAPIError extends TypedError {} +export class DeviceAPIError extends ExpectedError {} export class BadRequestDeviceAPIError extends DeviceAPIError {} export class ServiceUnavailableAPIError extends DeviceAPIError {} diff --git a/lib/utils/device/live.ts b/lib/utils/device/live.ts index 7ede20c4..23463d6c 100644 --- a/lib/utils/device/live.ts +++ b/lib/utils/device/live.ts @@ -219,17 +219,17 @@ export class LivepushManager { this.rebuildsCancelled[serviceName] = false; } } + } - // Setup cleanup handlers for the device - process.once('SIGINT', async () => { - this.logger.logLivepush('Cleaning up device...'); - await Promise.all( - _.map(this.containers, (container) => { - container.livepush.cleanupIntermediateContainers(); - }), - ); - this.logger.logDebug('Cleaning up done.'); - }); + /** Delete intermediate build containers from the device */ + public async cleanup() { + this.logger.logLivepush('Cleaning up device...'); + await Promise.all( + _.map(this.containers, (container) => + container.livepush.cleanupIntermediateContainers(), + ), + ); + this.logger.logDebug('Cleaning up done.'); } protected setupFilesystemWatcher( diff --git a/lib/utils/device/logs.ts b/lib/utils/device/logs.ts index ef7b00f4..1181564f 100644 --- a/lib/utils/device/logs.ts +++ b/lib/utils/device/logs.ts @@ -20,7 +20,7 @@ import type { Readable } from 'stream'; import { getChalk } from '../lazy'; import Logger = require('../logger'); -import { ExpectedError } from '../../errors'; +import { ExpectedError, SIGINTError } from '../../errors'; class DeviceConnectionLostError extends ExpectedError { public static defaultMsg = 'Connection to device lost'; @@ -62,15 +62,16 @@ async function displayDeviceLogs( system: boolean, filterServices?: string[], ): Promise { + const { addSIGINTHandler } = await import('../helpers'); let gotSignal = false; const handleSignal = () => { gotSignal = true; logs.emit('close'); }; - process.once('SIGINT', handleSignal); + addSIGINTHandler(handleSignal); process.once('SIGTERM', handleSignal); try { - await new Promise((resolve, reject) => { + await new Promise((_resolve, reject) => { logs.on('data', (log) => { displayLogLine(log, logger, system, filterServices); }); @@ -78,7 +79,7 @@ async function displayDeviceLogs( logs.once('end', () => { logger.logWarn(DeviceConnectionLostError.defaultMsg); if (gotSignal) { - resolve(); + reject(new SIGINTError('Log streaming aborted on SIGINT signal')); } else { reject(new DeviceConnectionLostError()); } @@ -90,6 +91,12 @@ async function displayDeviceLogs( } } +/** + * Open a TCP connection to the device's supervisor (TCP port 48484) and tail + * (display) device logs. Retry (reconnect) up to maxAttempts times if the + * TCP connection drops. Don't retry on SIGINT (CTRL-C). + * See function `displayDeviceLogs` for parameter documentation. + */ export async function connectAndDisplayDeviceLogs({ deviceApi, logger, diff --git a/lib/utils/helpers.ts b/lib/utils/helpers.ts index 5bdaa873..95e819b9 100644 --- a/lib/utils/helpers.ts +++ b/lib/utils/helpers.ts @@ -22,7 +22,7 @@ import * as os from 'os'; import type * as ShellEscape from 'shell-escape'; import type { Device, PineOptions } from 'balena-sdk'; -import { ExpectedError } from '../errors'; +import { ExpectedError, SIGINTError } from '../errors'; import { getBalenaSdk, getChalk, getVisuals } from './lazy'; import { promisify } from 'util'; import { isSubcommand } from '../preparser'; @@ -244,6 +244,10 @@ export async function retry({ try { return await func(); } catch (err) { + // Don't retry on SIGINT (CTRL-C) + if (err instanceof SIGINTError) { + throw err; + } if (count) { // use Math.max to work around system time changes, e.g. DST const elapsedMs = Math.max(0, Date.now() - lastAttemptMs); @@ -513,3 +517,61 @@ export const expandForAppName: PineOptions = { is_running__release: { $select: 'commit' }, }, }; + +/** + * Use the `readline` library on Windows to install SIGINT handlers. + * This appears to be necessary on MSYS / Git for Windows, and also useful + * with PowerShell to avoid the built-in "Terminate batch job? (Y/N)" prompt + * that appears to result in ungraceful / abrupt process termination. + */ +const installReadlineSigintEmitter = _.once(function emitSigint() { + if (process.platform === 'win32') { + const readline = require('readline') as typeof import('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + }); + rl.on('SIGINT', () => process.emit('SIGINT' as any)); + } +}); + +/** + * Centralized cross-platform logic to install a SIGINT handler + * @param sigintHandler The handler function + * @param once Whether the handler should be called no more than once + */ +export function addSIGINTHandler(sigintHandler: () => void, once = true) { + installReadlineSigintEmitter(); + if (once) { + process.once('SIGINT', sigintHandler); + } else { + process.on('SIGINT', sigintHandler); + } +} + +/** + * Call the given task function (which returns a promise) with the given + * arguments, await the returned promise and resolve to the same result. + * While awaiting for that promise, also await for a SIGINT signal (if any), + * with a new SIGINT handler that is automatically removed on return. + * If a SIGINT signal is received while awaiting for the task function, + * immediately return a promise that rejects with SIGINTError. + * @param task An async function to be executed and awaited + * @param theArgs Arguments to be passed to the task function + */ +export async function awaitInterruptibleTask< + T extends (...args: any[]) => Promise +>(task: T, ...theArgs: Parameters): Promise> { + let sigintHandler: () => void = () => undefined; + const sigintPromise = new Promise((_resolve, reject) => { + sigintHandler = () => { + reject(new SIGINTError('Task aborted on SIGINT signal')); + }; + addSIGINTHandler(sigintHandler); + }); + try { + return await Promise.race([sigintPromise, task(...theArgs)]); + } finally { + process.removeListener('SIGINT', sigintHandler); + } +} diff --git a/lib/utils/remote-build.ts b/lib/utils/remote-build.ts index 39951f14..2bf2403a 100644 --- a/lib/utils/remote-build.ts +++ b/lib/utils/remote-build.ts @@ -22,8 +22,7 @@ import type * as Stream from 'stream'; import streamToPromise = require('stream-to-promise'); import type { Pack } from 'tar-stream'; -import { ExpectedError } from '../errors'; -import { exitWithExpectedError } from '../errors'; +import { ExpectedError, SIGINTError } from '../errors'; import { tarDirectory } from './compose_ts'; import { getVisuals, stripIndent } from './lazy'; import Logger = require('./logger'); @@ -110,79 +109,76 @@ async function getBuilderEndpoint( } export async function startRemoteBuild(build: RemoteBuild): Promise { - const stream = await getRemoteBuildStream(build); + const [buildRequest, stream] = await getRemoteBuildStream(build); - // Special windows handling (win64 also reports win32) - if (process.platform === 'win32') { - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, - }); + // Setup CTRL-C handler so the user can interrupt the build + let cancellationPromise = Promise.resolve(); + const sigintHandler = () => { + process.exitCode = 130; + console.error('\nReceived SIGINT, cleaning up. Please wait.'); + try { + cancellationPromise = cancelBuildIfNecessary(build); + } catch (err) { + console.error(err.message); + } finally { + buildRequest.abort(); + const sigintErr = new SIGINTError('Build aborted on SIGINT signal'); + sigintErr.code = 'SIGINT'; + stream.emit('error', sigintErr); + } + }; - rl.on('SIGINT', () => process.emit('SIGINT' as any)); - } + const { addSIGINTHandler } = await import('./helpers'); + addSIGINTHandler(sigintHandler); - if (!build.opts.headless) { - return awaitRemoteBuildStream(build, stream); - } - - // We're running a headless build, which means we'll - // get a single object back, detailing if the build has - // been started - let result: HeadlessBuilderMessage; try { - const response = await streamToPromise(stream); - result = JSON.parse(response.toString()); - } catch (e) { - throw new Error( - `There was an error reading the response from the remote builder: ${e}`, - ); + if (build.opts.headless) { + await handleHeadlessBuildStream(stream); + } else { + await handleRemoteBuildStream(build, stream); + } + } finally { + process.removeListener('SIGINT', sigintHandler); + globalLogger.outputDeferredMessages(); + await cancellationPromise; } - handleHeadlessBuildMessage(result); } -async function awaitRemoteBuildStream( +async function handleRemoteBuildStream( build: RemoteBuild, - stream: NodeJS.ReadWriteStream, + stream: Stream.Stream, ) { - let sigintHandler: (() => Promise) | null = null; - try { - await new Promise((resolve, reject) => { - // Setup interrupt handlers so we can cancel the build if the user presses - // ctrl+c - sigintHandler = async () => { - process.exitCode = 130; - console.error('Received SIGINT, cleaning up. Please wait.'); - try { - await cancelBuildIfNecessary(build); - } catch (err) { - console.error(err.message); - } finally { - stream.end(); - } - }; - process.once('SIGINT', sigintHandler); - stream.on('data', getBuilderMessageHandler(build)); - stream.on('end', resolve); - stream.on('error', reject); - }); - } finally { - if (sigintHandler) { - process.removeListener('SIGINT', sigintHandler); - } - globalLogger.outputDeferredMessages(); - } + await new Promise((resolve, reject) => { + const msgHandler = getBuilderMessageHandler(build); + stream.on('data', msgHandler); + stream.once('end', resolve); + stream.once('error', reject); + }); if (build.hadError) { throw new RemoteBuildFailedError(); } } -function handleHeadlessBuildMessage(message: HeadlessBuilderMessage) { +async function handleHeadlessBuildStream(stream: Stream.Stream) { + // We're running a headless build, which means we'll + // get a single object back, detailing if the build has + // been started + let message: HeadlessBuilderMessage; + try { + const response = await streamToPromise(stream as NodeJS.ReadWriteStream); + message = JSON.parse(response.toString()); + } catch (e) { + if (e.code === 'SIGINT') { + throw e; + } + throw new Error( + `There was an error reading the response from the remote builder: ${e}`, + ); + } if (!process.stdout.isTTY) { process.stdout.write(JSON.stringify(message)); return; } - if (message.started) { console.log('Build successfully started'); console.log(` Release ID: ${message.releaseId!}`); @@ -266,6 +262,9 @@ function getBuilderMessageHandler( async function cancelBuildIfNecessary(build: RemoteBuild): Promise { if (build.releaseId != null) { + console.error( + `Setting 'cancelled' release status for release ID ${build.releaseId} ...`, + ); await build.sdk.pine.patch({ resource: 'release', id: build.releaseId, @@ -352,7 +351,7 @@ function createRemoteBuildRequest( headers: { 'Content-Encoding': 'gzip' }, body: tarStream.pipe(zlib.createGzip({ level: 6 })), }) - .on('error', onError) + .once('error', onError) // `.once` because the handler re-emits .once('response', (response: request.RequestResponse) => { if (response.statusCode >= 100 && response.statusCode < 400) { if (DEBUG_MODE) { @@ -368,28 +367,31 @@ function createRemoteBuildRequest( if (response.body) { msgArr.push(response.body); } - onError(new Error(msgArr.join('\n'))); + onError(new ExpectedError(msgArr.join('\n'))); } }); } async function getRemoteBuildStream( build: RemoteBuild, -): Promise { +): Promise<[request.Request, Stream.Stream]> { const builderUrl = await getBuilderEndpoint( build.baseUrl, build.owner, build.app, build.opts, ); - + let stream: Stream.Stream; let uploadSpinner = { stop: () => { /* noop */ }, }; - let exitOnError = (error: Error) => { - return exitWithExpectedError(error); + const onError = (error: Error) => { + uploadSpinner.stop(); + if (stream) { + stream.emit('error', error); + } }; // We only show the spinner when outputting to a tty if (process.stdout.isTTY) { @@ -397,36 +399,26 @@ async function getRemoteBuildStream( uploadSpinner = new visuals.Spinner( 'Uploading source package to balenaCloud', ); - exitOnError = (error: Error): never => { - uploadSpinner.stop(); - return exitWithExpectedError(error); - }; - // This is not strongly typed to start with, so we cast - // to any to allow the method call (uploadSpinner as any).start(); } - try { - const tarStream = await getTarStream(build); - const buildRequest = createRemoteBuildRequest( - build, - tarStream, - builderUrl, - exitOnError, - ); - let stream: NodeJS.ReadWriteStream; - if (build.opts.headless) { - stream = (buildRequest as unknown) as NodeJS.ReadWriteStream; - } else { - stream = buildRequest.pipe(JSONStream.parse('*')); - } - return stream - .once('close', () => uploadSpinner.stop()) - .once('data', () => uploadSpinner.stop()) - .once('end', () => uploadSpinner.stop()) - .once('error', () => uploadSpinner.stop()) - .once('finish', () => uploadSpinner.stop()); - } catch (error) { - return exitOnError(error); + const tarStream = await getTarStream(build); + const buildRequest = createRemoteBuildRequest( + build, + tarStream, + builderUrl, + onError, + ); + if (build.opts.headless) { + stream = buildRequest; + } else { + stream = buildRequest.pipe(JSONStream.parse('*')); } + stream = stream + .once('error', () => uploadSpinner.stop()) + .once('close', () => uploadSpinner.stop()) + .once('data', () => uploadSpinner.stop()) + .once('end', () => uploadSpinner.stop()) + .once('finish', () => uploadSpinner.stop()); + return [buildRequest, stream]; }