Livepush: Extend CTRL-C availability (don't ignore CTRL-C during image build)

Change-type: patch
This commit is contained in:
Paulo Castro 2020-12-03 20:50:00 +00:00
parent 54731c2d20
commit 07666e953f
7 changed files with 217 additions and 129 deletions

View File

@ -21,7 +21,10 @@ import { TypedError } from 'typed-error';
import { getChalk, stripIndent } from './utils/lazy';
import { getHelp } from './utils/messages';
export class ExpectedError extends TypedError {}
export class ExpectedError extends TypedError {
public code?: string;
public exitCode?: number;
}
export class NotLoggedInError extends ExpectedError {}
@ -39,6 +42,8 @@ export class NoPortsDefinedError extends ExpectedError {
}
}
export class SIGINTError extends ExpectedError {}
/**
* instanceOf is a more reliable implementation of the plain `instanceof`
* typescript operator, for use with TypedError errors when the error
@ -177,7 +182,7 @@ async function sentryCaptureException(error: Error) {
await Sentry.close(1000);
} catch (e) {
if (process.env.DEBUG) {
console.error('Timeout reporting error to sentry.io');
console.error('[debug] Timeout reporting error to sentry.io');
}
}
}
@ -212,8 +217,9 @@ export async function handleError(error: Error) {
if (!process.env.BALENARC_NO_SENTRY) {
await sentryCaptureException(error);
}
// Unhandled/unexpected error: ensure that the process terminates.
}
if (error instanceof SIGINTError || !isExpectedError) {
// SIGINT or unexpected error: ensure that the process terminates.
// The exit error code was set above through `process.exitCode`.
process.exit();
}
@ -247,6 +253,7 @@ export const printExpectedErrorMessage = function (message: string) {
* them and call this function.
*
* DEPRECATED: Use `throw new ExpectedError(<message>)` instead.
* If a specific process exit code x must be set, use process.exitCode = x
*/
export function exitWithExpectedError(message: string | Error): never {
if (message instanceof Error) {

View File

@ -28,6 +28,7 @@ import {
import type { Readable } from 'stream';
import { BALENA_ENGINE_TMP_PATH } from '../../config';
import { ExpectedError } from '../../errors';
import {
checkBuildSecretsRequirements,
loadProject,
@ -37,7 +38,6 @@ import {
import Logger = require('../logger');
import { DeviceAPI, DeviceInfo } from './api';
import * as LocalPushErrors from './errors';
import { DeviceAPIError } from './errors';
import LivepushManager from './live';
import { displayBuildLog } from './logs';
@ -76,7 +76,6 @@ async function environmentFromInput(
serviceNames: string[],
logger: Logger,
): Promise<ParsedEnvironment> {
const { exitWithExpectedError } = await import('../../errors');
// A normal environment variable regex, with an added part
// to find a colon followed servicename at the start
const varRegex = /^(?:([^\s:]+):)?([^\s]+?)=(.*)$/;
@ -92,7 +91,7 @@ async function environmentFromInput(
for (const env of envs) {
const maybeMatch = env.match(varRegex);
if (maybeMatch == null) {
exitWithExpectedError(`Unable to parse environment variable: ${env}`);
throw new ExpectedError(`Unable to parse environment variable: ${env}`);
}
const match = maybeMatch!;
let service: string | undefined;
@ -122,8 +121,6 @@ async function environmentFromInput(
}
export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
const { exitWithExpectedError } = await import('../../errors');
// Resolve .local addresses to IP to avoid
// issue with Windows and rapid repeat lookups.
// see: https://github.com/balena-io/balena-cli/issues/1518
@ -143,7 +140,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
globalLogger.logDebug('Checking we can access device');
await api.ping();
} catch (e) {
exitWithExpectedError(
throw new ExpectedError(
`Could not communicate with local mode device at address ${opts.deviceHost}`,
);
}
@ -157,7 +154,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
const version = await api.getVersion();
globalLogger.logDebug(`Checking device version: ${version}`);
if (!semver.satisfies(version, '>=7.21.4')) {
exitWithExpectedError(versionError);
throw new ExpectedError(versionError);
}
if (!opts.nolive && !semver.satisfies(version, '>=9.7.0')) {
globalLogger.logWarn(
@ -168,8 +165,8 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
} catch (e) {
// Very old supervisor versions do not support /version endpoint
// a DeviceAPIError is expected in this case
if (e instanceof DeviceAPIError) {
exitWithExpectedError(versionError);
if (e instanceof LocalPushErrors.DeviceAPIError) {
throw new ExpectedError(versionError);
} else {
throw e;
}
@ -209,7 +206,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
if (!opts.nolive) {
buildLogs = {};
}
const buildTasks = await performBuilds(
const { awaitInterruptibleTask } = await import('../helpers');
const buildTasks = await awaitInterruptibleTask<typeof performBuilds>(
performBuilds,
project.composition,
tarStream,
docker,
@ -269,10 +269,13 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
}
globalLogger.logLivepush('Watching for file changes...');
}
await Promise.all(promises).finally(() => {
try {
await awaitInterruptibleTask(() => Promise.all(promises));
} finally {
// Stop watching files after log streaming ends (e.g. on SIGINT)
livepush?.close();
});
await livepush?.cleanup();
}
}
async function streamDeviceLogs(
@ -441,7 +444,9 @@ export async function rebuildSingleTask(
);
if (task == null) {
throw new Error(`Could not find build task for service ${serviceName}`);
throw new ExpectedError(
`Could not find build task for service ${serviceName}`,
);
}
await assignDockerBuildOpts(docker, [task], opts);
@ -610,8 +615,6 @@ export function generateTargetState(
}
async function inspectBuildResults(images: LocalImage[]): Promise<void> {
const { exitWithExpectedError } = await import('../../errors');
const failures: LocalPushErrors.BuildFailure[] = [];
_.each(images, (image) => {
@ -624,6 +627,6 @@ async function inspectBuildResults(images: LocalImage[]): Promise<void> {
});
if (failures.length > 0) {
exitWithExpectedError(new LocalPushErrors.BuildError(failures).toString());
throw new LocalPushErrors.BuildError(failures).toString();
}
}

View File

@ -1,12 +1,29 @@
/**
* @license
* Copyright 2018-2020 Balena Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as _ from 'lodash';
import { TypedError } from 'typed-error';
import { ExpectedError } from '../../errors';
export interface BuildFailure {
error: Error;
serviceName: string;
}
export class BuildError extends TypedError {
export class BuildError extends ExpectedError {
private failures: BuildFailure[];
public constructor(failures: BuildFailure[]) {
@ -33,7 +50,7 @@ export class BuildError extends TypedError {
}
}
export class DeviceAPIError extends TypedError {}
export class DeviceAPIError extends ExpectedError {}
export class BadRequestDeviceAPIError extends DeviceAPIError {}
export class ServiceUnavailableAPIError extends DeviceAPIError {}

View File

@ -219,17 +219,17 @@ export class LivepushManager {
this.rebuildsCancelled[serviceName] = false;
}
}
}
// Setup cleanup handlers for the device
process.once('SIGINT', async () => {
this.logger.logLivepush('Cleaning up device...');
await Promise.all(
_.map(this.containers, (container) => {
container.livepush.cleanupIntermediateContainers();
}),
);
this.logger.logDebug('Cleaning up done.');
});
/** Delete intermediate build containers from the device */
public async cleanup() {
this.logger.logLivepush('Cleaning up device...');
await Promise.all(
_.map(this.containers, (container) =>
container.livepush.cleanupIntermediateContainers(),
),
);
this.logger.logDebug('Cleaning up done.');
}
protected setupFilesystemWatcher(

View File

@ -20,7 +20,7 @@ import type { Readable } from 'stream';
import { getChalk } from '../lazy';
import Logger = require('../logger');
import { ExpectedError } from '../../errors';
import { ExpectedError, SIGINTError } from '../../errors';
class DeviceConnectionLostError extends ExpectedError {
public static defaultMsg = 'Connection to device lost';
@ -62,15 +62,16 @@ async function displayDeviceLogs(
system: boolean,
filterServices?: string[],
): Promise<void> {
const { addSIGINTHandler } = await import('../helpers');
let gotSignal = false;
const handleSignal = () => {
gotSignal = true;
logs.emit('close');
};
process.once('SIGINT', handleSignal);
addSIGINTHandler(handleSignal);
process.once('SIGTERM', handleSignal);
try {
await new Promise((resolve, reject) => {
await new Promise((_resolve, reject) => {
logs.on('data', (log) => {
displayLogLine(log, logger, system, filterServices);
});
@ -78,7 +79,7 @@ async function displayDeviceLogs(
logs.once('end', () => {
logger.logWarn(DeviceConnectionLostError.defaultMsg);
if (gotSignal) {
resolve();
reject(new SIGINTError('Log streaming aborted on SIGINT signal'));
} else {
reject(new DeviceConnectionLostError());
}
@ -90,6 +91,12 @@ async function displayDeviceLogs(
}
}
/**
* Open a TCP connection to the device's supervisor (TCP port 48484) and tail
* (display) device logs. Retry (reconnect) up to maxAttempts times if the
* TCP connection drops. Don't retry on SIGINT (CTRL-C).
* See function `displayDeviceLogs` for parameter documentation.
*/
export async function connectAndDisplayDeviceLogs({
deviceApi,
logger,

View File

@ -22,7 +22,7 @@ import * as os from 'os';
import type * as ShellEscape from 'shell-escape';
import type { Device, PineOptions } from 'balena-sdk';
import { ExpectedError } from '../errors';
import { ExpectedError, SIGINTError } from '../errors';
import { getBalenaSdk, getChalk, getVisuals } from './lazy';
import { promisify } from 'util';
import { isSubcommand } from '../preparser';
@ -244,6 +244,10 @@ export async function retry<T>({
try {
return await func();
} catch (err) {
// Don't retry on SIGINT (CTRL-C)
if (err instanceof SIGINTError) {
throw err;
}
if (count) {
// use Math.max to work around system time changes, e.g. DST
const elapsedMs = Math.max(0, Date.now() - lastAttemptMs);
@ -513,3 +517,61 @@ export const expandForAppName: PineOptions<Device> = {
is_running__release: { $select: 'commit' },
},
};
/**
* Use the `readline` library on Windows to install SIGINT handlers.
* This appears to be necessary on MSYS / Git for Windows, and also useful
* with PowerShell to avoid the built-in "Terminate batch job? (Y/N)" prompt
* that appears to result in ungraceful / abrupt process termination.
*/
const installReadlineSigintEmitter = _.once(function emitSigint() {
if (process.platform === 'win32') {
const readline = require('readline') as typeof import('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.on('SIGINT', () => process.emit('SIGINT' as any));
}
});
/**
* Centralized cross-platform logic to install a SIGINT handler
* @param sigintHandler The handler function
* @param once Whether the handler should be called no more than once
*/
export function addSIGINTHandler(sigintHandler: () => void, once = true) {
installReadlineSigintEmitter();
if (once) {
process.once('SIGINT', sigintHandler);
} else {
process.on('SIGINT', sigintHandler);
}
}
/**
* Call the given task function (which returns a promise) with the given
* arguments, await the returned promise and resolve to the same result.
* While awaiting for that promise, also await for a SIGINT signal (if any),
* with a new SIGINT handler that is automatically removed on return.
* If a SIGINT signal is received while awaiting for the task function,
* immediately return a promise that rejects with SIGINTError.
* @param task An async function to be executed and awaited
* @param theArgs Arguments to be passed to the task function
*/
export async function awaitInterruptibleTask<
T extends (...args: any[]) => Promise<any>
>(task: T, ...theArgs: Parameters<T>): Promise<ReturnType<T>> {
let sigintHandler: () => void = () => undefined;
const sigintPromise = new Promise<T>((_resolve, reject) => {
sigintHandler = () => {
reject(new SIGINTError('Task aborted on SIGINT signal'));
};
addSIGINTHandler(sigintHandler);
});
try {
return await Promise.race([sigintPromise, task(...theArgs)]);
} finally {
process.removeListener('SIGINT', sigintHandler);
}
}

View File

@ -22,8 +22,7 @@ import type * as Stream from 'stream';
import streamToPromise = require('stream-to-promise');
import type { Pack } from 'tar-stream';
import { ExpectedError } from '../errors';
import { exitWithExpectedError } from '../errors';
import { ExpectedError, SIGINTError } from '../errors';
import { tarDirectory } from './compose_ts';
import { getVisuals, stripIndent } from './lazy';
import Logger = require('./logger');
@ -110,79 +109,76 @@ async function getBuilderEndpoint(
}
export async function startRemoteBuild(build: RemoteBuild): Promise<void> {
const stream = await getRemoteBuildStream(build);
const [buildRequest, stream] = await getRemoteBuildStream(build);
// Special windows handling (win64 also reports win32)
if (process.platform === 'win32') {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// Setup CTRL-C handler so the user can interrupt the build
let cancellationPromise = Promise.resolve();
const sigintHandler = () => {
process.exitCode = 130;
console.error('\nReceived SIGINT, cleaning up. Please wait.');
try {
cancellationPromise = cancelBuildIfNecessary(build);
} catch (err) {
console.error(err.message);
} finally {
buildRequest.abort();
const sigintErr = new SIGINTError('Build aborted on SIGINT signal');
sigintErr.code = 'SIGINT';
stream.emit('error', sigintErr);
}
};
rl.on('SIGINT', () => process.emit('SIGINT' as any));
}
const { addSIGINTHandler } = await import('./helpers');
addSIGINTHandler(sigintHandler);
if (!build.opts.headless) {
return awaitRemoteBuildStream(build, stream);
}
// We're running a headless build, which means we'll
// get a single object back, detailing if the build has
// been started
let result: HeadlessBuilderMessage;
try {
const response = await streamToPromise(stream);
result = JSON.parse(response.toString());
} catch (e) {
throw new Error(
`There was an error reading the response from the remote builder: ${e}`,
);
if (build.opts.headless) {
await handleHeadlessBuildStream(stream);
} else {
await handleRemoteBuildStream(build, stream);
}
} finally {
process.removeListener('SIGINT', sigintHandler);
globalLogger.outputDeferredMessages();
await cancellationPromise;
}
handleHeadlessBuildMessage(result);
}
async function awaitRemoteBuildStream(
async function handleRemoteBuildStream(
build: RemoteBuild,
stream: NodeJS.ReadWriteStream,
stream: Stream.Stream,
) {
let sigintHandler: (() => Promise<void>) | null = null;
try {
await new Promise((resolve, reject) => {
// Setup interrupt handlers so we can cancel the build if the user presses
// ctrl+c
sigintHandler = async () => {
process.exitCode = 130;
console.error('Received SIGINT, cleaning up. Please wait.');
try {
await cancelBuildIfNecessary(build);
} catch (err) {
console.error(err.message);
} finally {
stream.end();
}
};
process.once('SIGINT', sigintHandler);
stream.on('data', getBuilderMessageHandler(build));
stream.on('end', resolve);
stream.on('error', reject);
});
} finally {
if (sigintHandler) {
process.removeListener('SIGINT', sigintHandler);
}
globalLogger.outputDeferredMessages();
}
await new Promise((resolve, reject) => {
const msgHandler = getBuilderMessageHandler(build);
stream.on('data', msgHandler);
stream.once('end', resolve);
stream.once('error', reject);
});
if (build.hadError) {
throw new RemoteBuildFailedError();
}
}
function handleHeadlessBuildMessage(message: HeadlessBuilderMessage) {
async function handleHeadlessBuildStream(stream: Stream.Stream) {
// We're running a headless build, which means we'll
// get a single object back, detailing if the build has
// been started
let message: HeadlessBuilderMessage;
try {
const response = await streamToPromise(stream as NodeJS.ReadWriteStream);
message = JSON.parse(response.toString());
} catch (e) {
if (e.code === 'SIGINT') {
throw e;
}
throw new Error(
`There was an error reading the response from the remote builder: ${e}`,
);
}
if (!process.stdout.isTTY) {
process.stdout.write(JSON.stringify(message));
return;
}
if (message.started) {
console.log('Build successfully started');
console.log(` Release ID: ${message.releaseId!}`);
@ -266,6 +262,9 @@ function getBuilderMessageHandler(
async function cancelBuildIfNecessary(build: RemoteBuild): Promise<void> {
if (build.releaseId != null) {
console.error(
`Setting 'cancelled' release status for release ID ${build.releaseId} ...`,
);
await build.sdk.pine.patch({
resource: 'release',
id: build.releaseId,
@ -352,7 +351,7 @@ function createRemoteBuildRequest(
headers: { 'Content-Encoding': 'gzip' },
body: tarStream.pipe(zlib.createGzip({ level: 6 })),
})
.on('error', onError)
.once('error', onError) // `.once` because the handler re-emits
.once('response', (response: request.RequestResponse) => {
if (response.statusCode >= 100 && response.statusCode < 400) {
if (DEBUG_MODE) {
@ -368,28 +367,31 @@ function createRemoteBuildRequest(
if (response.body) {
msgArr.push(response.body);
}
onError(new Error(msgArr.join('\n')));
onError(new ExpectedError(msgArr.join('\n')));
}
});
}
async function getRemoteBuildStream(
build: RemoteBuild,
): Promise<NodeJS.ReadWriteStream> {
): Promise<[request.Request, Stream.Stream]> {
const builderUrl = await getBuilderEndpoint(
build.baseUrl,
build.owner,
build.app,
build.opts,
);
let stream: Stream.Stream;
let uploadSpinner = {
stop: () => {
/* noop */
},
};
let exitOnError = (error: Error) => {
return exitWithExpectedError(error);
const onError = (error: Error) => {
uploadSpinner.stop();
if (stream) {
stream.emit('error', error);
}
};
// We only show the spinner when outputting to a tty
if (process.stdout.isTTY) {
@ -397,36 +399,26 @@ async function getRemoteBuildStream(
uploadSpinner = new visuals.Spinner(
'Uploading source package to balenaCloud',
);
exitOnError = (error: Error): never => {
uploadSpinner.stop();
return exitWithExpectedError(error);
};
// This is not strongly typed to start with, so we cast
// to any to allow the method call
(uploadSpinner as any).start();
}
try {
const tarStream = await getTarStream(build);
const buildRequest = createRemoteBuildRequest(
build,
tarStream,
builderUrl,
exitOnError,
);
let stream: NodeJS.ReadWriteStream;
if (build.opts.headless) {
stream = (buildRequest as unknown) as NodeJS.ReadWriteStream;
} else {
stream = buildRequest.pipe(JSONStream.parse('*'));
}
return stream
.once('close', () => uploadSpinner.stop())
.once('data', () => uploadSpinner.stop())
.once('end', () => uploadSpinner.stop())
.once('error', () => uploadSpinner.stop())
.once('finish', () => uploadSpinner.stop());
} catch (error) {
return exitOnError(error);
const tarStream = await getTarStream(build);
const buildRequest = createRemoteBuildRequest(
build,
tarStream,
builderUrl,
onError,
);
if (build.opts.headless) {
stream = buildRequest;
} else {
stream = buildRequest.pipe(JSONStream.parse('*'));
}
stream = stream
.once('error', () => uploadSpinner.stop())
.once('close', () => uploadSpinner.stop())
.once('data', () => uploadSpinner.stop())
.once('end', () => uploadSpinner.stop())
.once('finish', () => uploadSpinner.stop());
return [buildRequest, stream];
}