Merge pull request #2113 from balena-io/1828-livepush-connection-lost

Livepush, logs: Automatically reconnect on 'Connection to device lost'
This commit is contained in:
bulldozer-balena[bot] 2020-12-10 12:36:45 +00:00 committed by GitHub
commit ae3ccf759f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 441 additions and 197 deletions

View File

@ -1506,6 +1506,11 @@ device UUID, IP, or .local address
### Options
#### --max-retry MAX-RETRY
Maximum number of reconnection attempts on "connection lost" errors
(use 0 to disable auto reconnection).
#### -t, --tail
continuously stream output

View File

@ -23,6 +23,7 @@ import { LogMessage } from 'balena-sdk';
import { IArg } from '@oclif/parser/lib/args';
interface FlagsDef {
'max-retry'?: number;
tail?: boolean;
service?: string[];
system?: boolean;
@ -33,6 +34,8 @@ interface ArgsDef {
device: string;
}
const MAX_RETRY = 1000;
export default class LogsCmd extends Command {
public static description = stripIndent`
Show device logs.
@ -75,6 +78,11 @@ export default class LogsCmd extends Command {
public static usage = 'logs <device>';
public static flags: flags.Input<FlagsDef> = {
'max-retry': flags.integer({
description: stripIndent`
Maximum number of reconnection attempts on "connection lost" errors
(use 0 to disable auto reconnection).`,
}),
tail: flags.boolean({
default: false,
description: 'continuously stream output',
@ -105,7 +113,7 @@ export default class LogsCmd extends Command {
const balena = getBalenaSdk();
const { serviceIdToName } = await import('../utils/cloud');
const { displayDeviceLogs, displayLogObject } = await import(
const { connectAndDisplayDeviceLogs, displayLogObject } = await import(
'../utils/device/logs'
);
const { validateIPAddress, validateDotLocalUrl } = await import(
@ -153,13 +161,13 @@ export default class LogsCmd extends Command {
}
logger.logDebug('Streaming logs');
const logStream = await deviceApi.getLogStream();
await displayDeviceLogs(
logStream,
await connectAndDisplayDeviceLogs({
deviceApi,
logger,
options.system || false,
options.service,
);
system: options.system || false,
filterServices: options.service,
maxAttempts: 1 + (options['max-retry'] ?? MAX_RETRY),
});
} else {
// Logs from cloud
await Command.checkLoggedIn();

View File

@ -21,7 +21,10 @@ import { TypedError } from 'typed-error';
import { getChalk, stripIndent } from './utils/lazy';
import { getHelp } from './utils/messages';
export class ExpectedError extends TypedError {}
export class ExpectedError extends TypedError {
public code?: string;
public exitCode?: number;
}
export class NotLoggedInError extends ExpectedError {}
@ -39,6 +42,8 @@ export class NoPortsDefinedError extends ExpectedError {
}
}
export class SIGINTError extends ExpectedError {}
/**
* instanceOf is a more reliable implementation of the plain `instanceof`
* typescript operator, for use with TypedError errors when the error
@ -177,7 +182,7 @@ async function sentryCaptureException(error: Error) {
await Sentry.close(1000);
} catch (e) {
if (process.env.DEBUG) {
console.error('Timeout reporting error to sentry.io');
console.error('[debug] Timeout reporting error to sentry.io');
}
}
}
@ -212,8 +217,9 @@ export async function handleError(error: Error) {
if (!process.env.BALENARC_NO_SENTRY) {
await sentryCaptureException(error);
}
// Unhandled/unexpected error: ensure that the process terminates.
}
if (error instanceof SIGINTError || !isExpectedError) {
// SIGINT or unexpected error: ensure that the process terminates.
// The exit error code was set above through `process.exitCode`.
process.exit();
}
@ -247,6 +253,7 @@ export const printExpectedErrorMessage = function (message: string) {
* them and call this function.
*
* DEPRECATED: Use `throw new ExpectedError(<message>)` instead.
* If a specific process exit code x must be set, use process.exitCode = x
*/
export function exitWithExpectedError(message: string | Error): never {
if (message instanceof Error) {

View File

@ -366,15 +366,15 @@ export const pushAndUpdateServiceImages = function (
images.map(({ serviceImage, localImage, props, logs }, index) =>
Promise.all([
localImage.inspect().then((img) => img.Size),
retry(
retry({
// @ts-ignore
() => progress.push(localImage.name, reporters[index], opts),
3, // `times` - retry 3 times
func: () => progress.push(localImage.name, reporters[index], opts),
maxAttempts: 3, // try calling func 3 times (max)
// @ts-ignore
localImage.name, // `label` included in retry log messages
2000, // `delayMs` - wait 2 seconds before the 1st retry
1.4, // `backoffScaler` - wait multiplier for each retry
).finally(renderer.end),
label: localImage.name, // label for retry log messages
initialDelayMs: 2000, // wait 2 seconds before the 1st retry
backoffScaler: 1.4, // wait multiplier for each retry
}).finally(renderer.end),
])
.then(
/** @type {([number, string]) => void} */

View File

@ -28,6 +28,7 @@ import {
import type { Readable } from 'stream';
import { BALENA_ENGINE_TMP_PATH } from '../../config';
import { ExpectedError } from '../../errors';
import {
checkBuildSecretsRequirements,
loadProject,
@ -37,7 +38,6 @@ import {
import Logger = require('../logger');
import { DeviceAPI, DeviceInfo } from './api';
import * as LocalPushErrors from './errors';
import { DeviceAPIError } from './errors';
import LivepushManager from './live';
import { displayBuildLog } from './logs';
@ -76,7 +76,6 @@ async function environmentFromInput(
serviceNames: string[],
logger: Logger,
): Promise<ParsedEnvironment> {
const { exitWithExpectedError } = await import('../../errors');
// A normal environment variable regex, with an added part
// to find a colon followed servicename at the start
const varRegex = /^(?:([^\s:]+):)?([^\s]+?)=(.*)$/;
@ -92,7 +91,7 @@ async function environmentFromInput(
for (const env of envs) {
const maybeMatch = env.match(varRegex);
if (maybeMatch == null) {
exitWithExpectedError(`Unable to parse environment variable: ${env}`);
throw new ExpectedError(`Unable to parse environment variable: ${env}`);
}
const match = maybeMatch!;
let service: string | undefined;
@ -122,9 +121,6 @@ async function environmentFromInput(
}
export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
const { exitWithExpectedError } = await import('../../errors');
const { displayDeviceLogs } = await import('./logs');
// Resolve .local addresses to IP to avoid
// issue with Windows and rapid repeat lookups.
// see: https://github.com/balena-io/balena-cli/issues/1518
@ -144,7 +140,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
globalLogger.logDebug('Checking we can access device');
await api.ping();
} catch (e) {
exitWithExpectedError(
throw new ExpectedError(
`Could not communicate with local mode device at address ${opts.deviceHost}`,
);
}
@ -158,7 +154,7 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
const version = await api.getVersion();
globalLogger.logDebug(`Checking device version: ${version}`);
if (!semver.satisfies(version, '>=7.21.4')) {
exitWithExpectedError(versionError);
throw new ExpectedError(versionError);
}
if (!opts.nolive && !semver.satisfies(version, '>=9.7.0')) {
globalLogger.logWarn(
@ -169,8 +165,8 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
} catch (e) {
// Very old supervisor versions do not support /version endpoint
// a DeviceAPIError is expected in this case
if (e instanceof DeviceAPIError) {
exitWithExpectedError(versionError);
if (e instanceof LocalPushErrors.DeviceAPIError) {
throw new ExpectedError(versionError);
} else {
throw e;
}
@ -210,7 +206,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
if (!opts.nolive) {
buildLogs = {};
}
const buildTasks = await performBuilds(
const { awaitInterruptibleTask } = await import('../helpers');
const buildTasks = await awaitInterruptibleTask<typeof performBuilds>(
performBuilds,
project.composition,
tarStream,
docker,
@ -220,6 +219,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
buildLogs,
);
globalLogger.outputDeferredMessages();
// Print a newline to clearly separate build time and runtime
console.log();
const envs = await environmentFromInput(
opts.env,
Object.getOwnPropertyNames(project.composition.services),
@ -244,10 +247,11 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
// Now that we've set the target state, the device will do it's thing
// so we can either just display the logs, or start a livepush session
// (whilst also display logs)
const promises: Array<Promise<void>> = [streamDeviceLogs(api, opts)];
let livepush: LivepushManager | null = null;
if (!opts.nolive) {
// Print a newline to clear seperate build time and runtime
console.log();
const livepush = new LivepushManager({
livepush = new LivepushManager({
api,
buildContext: opts.source,
buildTasks,
@ -257,41 +261,40 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
buildLogs: buildLogs!,
deployOpts: opts,
});
const promises: Array<Promise<void>> = [livepush.init()];
// Only show logs if we're not detaching
if (!opts.detached) {
const logStream = await api.getLogStream();
globalLogger.logInfo('Streaming device logs...');
promises.push(
displayDeviceLogs(logStream, globalLogger, opts.system, opts.services),
);
} else {
promises.push(livepush.init());
if (opts.detached) {
globalLogger.logLivepush(
'Running in detached mode, no service logs will be shown',
);
}
globalLogger.logLivepush('Watching for file changes...');
globalLogger.outputDeferredMessages();
await Promise.all(promises);
} else {
if (opts.detached) {
return;
}
// Print an empty newline to separate the build output
// from the device output
console.log();
// Now all we need to do is stream back the logs
const logStream = await api.getLogStream();
globalLogger.logInfo('Streaming device logs...');
globalLogger.outputDeferredMessages();
await displayDeviceLogs(
logStream,
globalLogger,
opts.system,
opts.services,
);
}
try {
await awaitInterruptibleTask(() => Promise.all(promises));
} finally {
// Stop watching files after log streaming ends (e.g. on SIGINT)
livepush?.close();
await livepush?.cleanup();
}
}
async function streamDeviceLogs(
deviceApi: DeviceAPI,
opts: DeviceDeployOptions,
) {
// Only show logs if we're not detaching
if (opts.detached) {
return;
}
globalLogger.logInfo('Streaming device logs...');
const { connectAndDisplayDeviceLogs } = await import('./logs');
return connectAndDisplayDeviceLogs({
deviceApi,
logger: globalLogger,
system: opts.system || false,
filterServices: opts.services,
maxAttempts: 1001,
});
}
function connectToDocker(host: string, port: number): Docker {
@ -441,7 +444,9 @@ export async function rebuildSingleTask(
);
if (task == null) {
throw new Error(`Could not find build task for service ${serviceName}`);
throw new ExpectedError(
`Could not find build task for service ${serviceName}`,
);
}
await assignDockerBuildOpts(docker, [task], opts);
@ -610,8 +615,6 @@ export function generateTargetState(
}
async function inspectBuildResults(images: LocalImage[]): Promise<void> {
const { exitWithExpectedError } = await import('../../errors');
const failures: LocalPushErrors.BuildFailure[] = [];
_.each(images, (image) => {
@ -624,6 +627,6 @@ async function inspectBuildResults(images: LocalImage[]): Promise<void> {
});
if (failures.length > 0) {
exitWithExpectedError(new LocalPushErrors.BuildError(failures).toString());
throw new LocalPushErrors.BuildError(failures).toString();
}
}

View File

@ -1,12 +1,29 @@
/**
* @license
* Copyright 2018-2020 Balena Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as _ from 'lodash';
import { TypedError } from 'typed-error';
import { ExpectedError } from '../../errors';
export interface BuildFailure {
error: Error;
serviceName: string;
}
export class BuildError extends TypedError {
export class BuildError extends ExpectedError {
private failures: BuildFailure[];
public constructor(failures: BuildFailure[]) {
@ -33,7 +50,7 @@ export class BuildError extends TypedError {
}
}
export class DeviceAPIError extends TypedError {}
export class DeviceAPIError extends ExpectedError {}
export class BadRequestDeviceAPIError extends DeviceAPIError {}
export class ServiceUnavailableAPIError extends DeviceAPIError {}

View File

@ -219,24 +219,17 @@ export class LivepushManager {
this.rebuildsCancelled[serviceName] = false;
}
}
}
// Setup cleanup handlers for the device
// This is necessary because the `exit-hook` module is used by several
// dependencies, and will exit without calling the following handler.
// Once https://github.com/balena-io/balena-cli/issues/867 has been solved,
// we are free to (and definitely should) remove the below line
process.removeAllListeners('SIGINT');
process.on('SIGINT', async () => {
this.logger.logLivepush('Cleaning up device...');
await Promise.all(
_.map(this.containers, (container) => {
container.livepush.cleanupIntermediateContainers();
}),
);
process.exit(0);
});
/** Delete intermediate build containers from the device */
public async cleanup() {
this.logger.logLivepush('Cleaning up device...');
await Promise.all(
_.map(this.containers, (container) =>
container.livepush.cleanupIntermediateContainers(),
),
);
this.logger.logDebug('Cleaning up done.');
}
protected setupFilesystemWatcher(
@ -290,6 +283,17 @@ export class LivepushManager {
return monitor;
}
/** Stop the filesystem watcher, allowing the Node process to exit gracefully */
public close() {
for (const container of Object.values(this.containers)) {
container.monitor.close().catch((err) => {
if (process.env.DEBUG) {
this.logger.logDebug(`chokidar.close() ${err.message}`);
}
});
}
}
public static preprocessDockerfile(content: string): string {
return new Dockerfile(content).generateLiveDockerfile();
}

View File

@ -1,9 +1,33 @@
/**
* @license
* Copyright 2018-2020 Balena Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import ColorHash = require('color-hash');
import * as _ from 'lodash';
import type { Readable } from 'stream';
import { getChalk } from '../lazy';
import Logger = require('../logger');
import { ExpectedError, SIGINTError } from '../../errors';
class DeviceConnectionLostError extends ExpectedError {
public static defaultMsg = 'Connection to device lost';
constructor(msg?: string) {
super(msg || DeviceConnectionLostError.defaultMsg);
}
}
interface Log {
message: string;
@ -32,23 +56,81 @@ interface BuildLog {
* @param filterService Filter the logs so that only logs
* from a single service will be displayed
*/
export function displayDeviceLogs(
async function displayDeviceLogs(
logs: Readable,
logger: Logger,
system: boolean,
filterServices?: string[],
): Promise<void> {
return new Promise((resolve, reject) => {
logs.on('data', (log) => {
displayLogLine(log, logger, system, filterServices);
const { addSIGINTHandler } = await import('../helpers');
let gotSignal = false;
const handleSignal = () => {
gotSignal = true;
logs.emit('close');
};
addSIGINTHandler(handleSignal);
process.once('SIGTERM', handleSignal);
try {
await new Promise((_resolve, reject) => {
logs.on('data', (log) => {
displayLogLine(log, logger, system, filterServices);
});
logs.once('error', reject);
logs.once('end', () => {
logger.logWarn(DeviceConnectionLostError.defaultMsg);
if (gotSignal) {
reject(new SIGINTError('Log streaming aborted on SIGINT signal'));
} else {
reject(new DeviceConnectionLostError());
}
});
});
} finally {
process.removeListener('SIGINT', handleSignal);
process.removeListener('SIGTERM', handleSignal);
}
}
logs.on('error', reject);
logs.on('end', () => {
logger.logError('Connection to device lost');
resolve();
/**
* Open a TCP connection to the device's supervisor (TCP port 48484) and tail
* (display) device logs. Retry (reconnect) up to maxAttempts times if the
* TCP connection drops. Don't retry on SIGINT (CTRL-C).
* See function `displayDeviceLogs` for parameter documentation.
*/
export async function connectAndDisplayDeviceLogs({
deviceApi,
logger,
system,
filterServices,
maxAttempts = 3,
}: {
deviceApi: import('./api').DeviceAPI;
logger: Logger;
system: boolean;
filterServices?: string[];
maxAttempts?: number;
}) {
async function connectAndDisplay() {
// Open a new connection to the device's supervisor, TCP port 48484
const logStream = await deviceApi.getLogStream();
return displayDeviceLogs(logStream, logger, system, filterServices);
}
const { retry } = await import('../../utils/helpers');
try {
await retry({
func: connectAndDisplay,
maxAttempts,
label: 'Streaming logs',
});
});
} catch (err) {
if (err instanceof DeviceConnectionLostError) {
err.message = `Max retry count (${
maxAttempts - 1
}) exceeded while attempting to reconnect to the device`;
}
throw err;
}
}
export function displayBuildLog(log: BuildLog, logger: Logger): void {

View File

@ -22,7 +22,7 @@ import * as os from 'os';
import type * as ShellEscape from 'shell-escape';
import type { Device, PineOptions } from 'balena-sdk';
import { ExpectedError } from '../errors';
import { ExpectedError, SIGINTError } from '../errors';
import { getBalenaSdk, getChalk, getVisuals } from './lazy';
import { promisify } from 'util';
import { isSubcommand } from '../preparser';
@ -204,39 +204,66 @@ function getApplication(
) as Promise<ApplicationWithDeviceType>;
}
const second = 1000; // 1000 milliseconds
const minute = 60 * second;
export const delay = promisify(setTimeout);
/**
* Call `func`, and if func() throws an error or returns a promise that
* eventually rejects, retry it `times` many times, each time printing a
* log message including the given `label` and the error that led to
* retrying. Wait delayMs before the first retry, multiplying the wait
* by backoffScaler for each further attempt.
* eventually rejects, retry it `times` many times, each time printing a log
* message including the given `label` and the error that led to retrying.
* Wait initialDelayMs before the first retry. Before each further retry,
* the delay is reduced by the time elapsed since the last retry, and
* increased by multiplying the result by backoffScaler.
* @param func: The function to call and, if needed, retry calling
* @param times: How many times to retry calling func()
* @param maxAttempts: How many times (max) to try calling func().
* func() will always be called at least once.
* @param label: Label to include in the retry log message
* @param startingDelayMs: How long to wait before the first retry
* @param initialDelayMs: How long to wait before the first retry
* @param backoffScaler: Multiplier to previous wait time
* @param count: Used "internally" for the recursive calls
* @param maxSingleDelayMs: Maximum interval between retries
*/
export async function retry<T>(
func: () => T,
times: number,
label: string,
startingDelayMs = 1000,
export async function retry<T>({
func,
maxAttempts,
label,
initialDelayMs = 1000,
backoffScaler = 2,
): Promise<T> {
for (let count = 0; count < times - 1; count++) {
maxSingleDelayMs = 1 * minute,
}: {
func: () => T;
maxAttempts: number;
label: string;
initialDelayMs?: number;
backoffScaler?: number;
maxSingleDelayMs?: number;
}): Promise<T> {
let delayMs = initialDelayMs;
for (let count = 0; count < maxAttempts - 1; count++) {
const lastAttemptMs = Date.now();
try {
return await func();
} catch (err) {
const delayMS = backoffScaler ** count * startingDelayMs;
// Don't retry on SIGINT (CTRL-C)
if (err instanceof SIGINTError) {
throw err;
}
if (count) {
// use Math.max to work around system time changes, e.g. DST
const elapsedMs = Math.max(0, Date.now() - lastAttemptMs);
// reduce delayMs by the time elapsed since the last attempt
delayMs = Math.max(initialDelayMs, delayMs - elapsedMs);
// increase delayMs by the backoffScaler factor
delayMs = Math.min(maxSingleDelayMs, delayMs * backoffScaler);
}
const sec = delayMs / 1000;
const secStr = sec < 10 ? sec.toFixed(1) : Math.round(sec).toString();
console.log(
`Retrying "${label}" after ${(delayMS / 1000).toFixed(2)}s (${
count + 1
} of ${times}) due to: ${err}`,
`Retrying "${label}" after ${secStr}s (${count + 1} of ${
maxAttempts - 1
}) due to: ${err}`,
);
await delay(delayMS);
await delay(delayMs);
}
}
return await func();
@ -490,3 +517,61 @@ export const expandForAppName: PineOptions<Device> = {
is_running__release: { $select: 'commit' },
},
};
/**
* Use the `readline` library on Windows to install SIGINT handlers.
* This appears to be necessary on MSYS / Git for Windows, and also useful
* with PowerShell to avoid the built-in "Terminate batch job? (Y/N)" prompt
* that appears to result in ungraceful / abrupt process termination.
*/
const installReadlineSigintEmitter = _.once(function emitSigint() {
if (process.platform === 'win32') {
const readline = require('readline') as typeof import('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.on('SIGINT', () => process.emit('SIGINT' as any));
}
});
/**
* Centralized cross-platform logic to install a SIGINT handler
* @param sigintHandler The handler function
* @param once Whether the handler should be called no more than once
*/
export function addSIGINTHandler(sigintHandler: () => void, once = true) {
installReadlineSigintEmitter();
if (once) {
process.once('SIGINT', sigintHandler);
} else {
process.on('SIGINT', sigintHandler);
}
}
/**
* Call the given task function (which returns a promise) with the given
* arguments, await the returned promise and resolve to the same result.
* While awaiting for that promise, also await for a SIGINT signal (if any),
* with a new SIGINT handler that is automatically removed on return.
* If a SIGINT signal is received while awaiting for the task function,
* immediately return a promise that rejects with SIGINTError.
* @param task An async function to be executed and awaited
* @param theArgs Arguments to be passed to the task function
*/
export async function awaitInterruptibleTask<
T extends (...args: any[]) => Promise<any>
>(task: T, ...theArgs: Parameters<T>): Promise<ReturnType<T>> {
let sigintHandler: () => void = () => undefined;
const sigintPromise = new Promise<T>((_resolve, reject) => {
sigintHandler = () => {
reject(new SIGINTError('Task aborted on SIGINT signal'));
};
addSIGINTHandler(sigintHandler);
});
try {
return await Promise.race([sigintPromise, task(...theArgs)]);
} finally {
process.removeListener('SIGINT', sigintHandler);
}
}

View File

@ -22,8 +22,7 @@ import type * as Stream from 'stream';
import streamToPromise = require('stream-to-promise');
import type { Pack } from 'tar-stream';
import { ExpectedError } from '../errors';
import { exitWithExpectedError } from '../errors';
import { ExpectedError, SIGINTError } from '../errors';
import { tarDirectory } from './compose_ts';
import { getVisuals, stripIndent } from './lazy';
import Logger = require('./logger');
@ -110,68 +109,76 @@ async function getBuilderEndpoint(
}
export async function startRemoteBuild(build: RemoteBuild): Promise<void> {
const stream = await getRemoteBuildStream(build);
const [buildRequest, stream] = await getRemoteBuildStream(build);
// Special windows handling (win64 also reports win32)
if (process.platform === 'win32') {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// Setup CTRL-C handler so the user can interrupt the build
let cancellationPromise = Promise.resolve();
const sigintHandler = () => {
process.exitCode = 130;
console.error('\nReceived SIGINT, cleaning up. Please wait.');
try {
cancellationPromise = cancelBuildIfNecessary(build);
} catch (err) {
console.error(err.message);
} finally {
buildRequest.abort();
const sigintErr = new SIGINTError('Build aborted on SIGINT signal');
sigintErr.code = 'SIGINT';
stream.emit('error', sigintErr);
}
};
rl.on('SIGINT', () => process.emit('SIGINT' as any));
const { addSIGINTHandler } = await import('./helpers');
addSIGINTHandler(sigintHandler);
try {
if (build.opts.headless) {
await handleHeadlessBuildStream(stream);
} else {
await handleRemoteBuildStream(build, stream);
}
} finally {
process.removeListener('SIGINT', sigintHandler);
globalLogger.outputDeferredMessages();
await cancellationPromise;
}
}
if (!build.opts.headless) {
return new Promise((resolve, reject) => {
// Setup interrupt handlers so we can cancel the build if the user presses
// ctrl+c
// This is necessary because the `exit-hook` module is used by several
// dependencies, and will exit without calling the following handler.
// Once https://github.com/balena-io/balena-cli/issues/867 has been solved,
// we are free to (and definitely should) remove the below line
process.removeAllListeners('SIGINT');
process.on('SIGINT', () => {
process.stderr.write('Received SIGINT, cleaning up. Please wait.\n');
cancelBuildIfNecessary(build).then(() => {
stream.end();
process.exit(130);
});
});
stream.on('data', getBuilderMessageHandler(build));
stream.on('end', resolve);
stream.on('error', reject);
}).then(() => {
globalLogger.outputDeferredMessages();
if (build.hadError) {
throw new RemoteBuildFailedError();
}
});
async function handleRemoteBuildStream(
build: RemoteBuild,
stream: Stream.Stream,
) {
await new Promise((resolve, reject) => {
const msgHandler = getBuilderMessageHandler(build);
stream.on('data', msgHandler);
stream.once('end', resolve);
stream.once('error', reject);
});
if (build.hadError) {
throw new RemoteBuildFailedError();
}
}
async function handleHeadlessBuildStream(stream: Stream.Stream) {
// We're running a headless build, which means we'll
// get a single object back, detailing if the build has
// been started
let result: HeadlessBuilderMessage;
let message: HeadlessBuilderMessage;
try {
const response = await streamToPromise(stream);
result = JSON.parse(response.toString());
const response = await streamToPromise(stream as NodeJS.ReadWriteStream);
message = JSON.parse(response.toString());
} catch (e) {
if (e.code === 'SIGINT') {
throw e;
}
throw new Error(
`There was an error reading the response from the remote builder: ${e}`,
);
}
handleHeadlessBuildMessage(result);
}
function handleHeadlessBuildMessage(message: HeadlessBuilderMessage) {
if (!process.stdout.isTTY) {
process.stdout.write(JSON.stringify(message));
return;
}
if (message.started) {
console.log('Build successfully started');
console.log(` Release ID: ${message.releaseId!}`);
@ -255,6 +262,9 @@ function getBuilderMessageHandler(
async function cancelBuildIfNecessary(build: RemoteBuild): Promise<void> {
if (build.releaseId != null) {
console.error(
`Setting 'cancelled' release status for release ID ${build.releaseId} ...`,
);
await build.sdk.pine.patch({
resource: 'release',
id: build.releaseId,
@ -341,7 +351,7 @@ function createRemoteBuildRequest(
headers: { 'Content-Encoding': 'gzip' },
body: tarStream.pipe(zlib.createGzip({ level: 6 })),
})
.on('error', onError)
.once('error', onError) // `.once` because the handler re-emits
.once('response', (response: request.RequestResponse) => {
if (response.statusCode >= 100 && response.statusCode < 400) {
if (DEBUG_MODE) {
@ -357,28 +367,31 @@ function createRemoteBuildRequest(
if (response.body) {
msgArr.push(response.body);
}
onError(new Error(msgArr.join('\n')));
onError(new ExpectedError(msgArr.join('\n')));
}
});
}
async function getRemoteBuildStream(
build: RemoteBuild,
): Promise<NodeJS.ReadWriteStream> {
): Promise<[request.Request, Stream.Stream]> {
const builderUrl = await getBuilderEndpoint(
build.baseUrl,
build.owner,
build.app,
build.opts,
);
let stream: Stream.Stream;
let uploadSpinner = {
stop: () => {
/* noop */
},
};
let exitOnError = (error: Error) => {
return exitWithExpectedError(error);
const onError = (error: Error) => {
uploadSpinner.stop();
if (stream) {
stream.emit('error', error);
}
};
// We only show the spinner when outputting to a tty
if (process.stdout.isTTY) {
@ -386,36 +399,26 @@ async function getRemoteBuildStream(
uploadSpinner = new visuals.Spinner(
'Uploading source package to balenaCloud',
);
exitOnError = (error: Error): never => {
uploadSpinner.stop();
return exitWithExpectedError(error);
};
// This is not strongly typed to start with, so we cast
// to any to allow the method call
(uploadSpinner as any).start();
}
try {
const tarStream = await getTarStream(build);
const buildRequest = createRemoteBuildRequest(
build,
tarStream,
builderUrl,
exitOnError,
);
let stream: NodeJS.ReadWriteStream;
if (build.opts.headless) {
stream = (buildRequest as unknown) as NodeJS.ReadWriteStream;
} else {
stream = buildRequest.pipe(JSONStream.parse('*'));
}
return stream
.once('close', () => uploadSpinner.stop())
.once('data', () => uploadSpinner.stop())
.once('end', () => uploadSpinner.stop())
.once('error', () => uploadSpinner.stop())
.once('finish', () => uploadSpinner.stop());
} catch (error) {
return exitOnError(error);
const tarStream = await getTarStream(build);
const buildRequest = createRemoteBuildRequest(
build,
tarStream,
builderUrl,
onError,
);
if (build.opts.headless) {
stream = buildRequest;
} else {
stream = buildRequest.pipe(JSONStream.parse('*'));
}
stream = stream
.once('error', () => uploadSpinner.stop())
.once('close', () => uploadSpinner.stop())
.once('data', () => uploadSpinner.stop())
.once('end', () => uploadSpinner.stop())
.once('finish', () => uploadSpinner.stop());
return [buildRequest, stream];
}

View File

@ -0,0 +1,17 @@
diff --git a/node_modules/exit-hook/index.js b/node_modules/exit-hook/index.js
index e18013f..3366356 100644
--- a/node_modules/exit-hook/index.js
+++ b/node_modules/exit-hook/index.js
@@ -14,9 +14,9 @@ function exit(exit, signal) {
el();
});
- if (exit === true) {
- process.exit(128 + signal);
- }
+ // if (exit === true) {
+ // process.exit(128 + signal);
+ // }
};
module.exports = function (cb) {

View File

@ -46,20 +46,33 @@ describe('balena logs', function () {
itS('should reach the expected endpoints on a local device', async () => {
supervisor.expectGetPing();
supervisor.expectGetLogs();
supervisor.expectGetLogs();
const { err, out } = await runCommand('logs 1.2.3.4');
const { err, out } = await runCommand('logs 1.2.3.4 --max-retry 1');
expect(err).to.be.empty;
const errLines = cleanOutput(err, true);
const errMsg =
'Max retry count (1) exceeded while attempting to reconnect to the device';
if (process.env.DEBUG) {
expect(errLines).to.include(errMsg);
} else {
expect(errLines).to.have.members([errMsg]);
}
const removeTimestamps = (logLine: string) =>
logLine.replace(/(?<=\[Logs\]) \[.+?\]/, '');
const cleanedOut = cleanOutput(out, true).map((l) => removeTimestamps(l));
expect(cleanedOut).to.deep.equal([
expect(cleanedOut).to.have.members([
'[Logs] Streaming logs',
'[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Error] Connection to device lost',
'[Warn] Connection to device lost',
'Retrying "Streaming logs" after 1.0s (1 of 1) due to: DeviceConnectionLostError: Connection to device lost',
'[Logs] Streaming logs',
'[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Warn] Connection to device lost',
]);
});
});