Livepush, logs: Automatically reconnect on 'Connection to device lost'

Change-type: minor
This commit is contained in:
Paulo Castro 2020-11-07 22:28:14 +00:00
parent d00db5ea8c
commit 54731c2d20
8 changed files with 205 additions and 83 deletions

View File

@ -1506,6 +1506,11 @@ device UUID, IP, or .local address
### Options
#### --max-retry MAX-RETRY
Maximum number of reconnection attempts on "connection lost" errors
(use 0 to disable auto reconnection).
#### -t, --tail
continuously stream output

View File

@ -23,6 +23,7 @@ import { LogMessage } from 'balena-sdk';
import { IArg } from '@oclif/parser/lib/args';
interface FlagsDef {
'max-retry'?: number;
tail?: boolean;
service?: string[];
system?: boolean;
@ -33,6 +34,8 @@ interface ArgsDef {
device: string;
}
const MAX_RETRY = 1000;
export default class LogsCmd extends Command {
public static description = stripIndent`
Show device logs.
@ -75,6 +78,11 @@ export default class LogsCmd extends Command {
public static usage = 'logs <device>';
public static flags: flags.Input<FlagsDef> = {
'max-retry': flags.integer({
description: stripIndent`
Maximum number of reconnection attempts on "connection lost" errors
(use 0 to disable auto reconnection).`,
}),
tail: flags.boolean({
default: false,
description: 'continuously stream output',
@ -105,7 +113,7 @@ export default class LogsCmd extends Command {
const balena = getBalenaSdk();
const { serviceIdToName } = await import('../utils/cloud');
const { displayDeviceLogs, displayLogObject } = await import(
const { connectAndDisplayDeviceLogs, displayLogObject } = await import(
'../utils/device/logs'
);
const { validateIPAddress, validateDotLocalUrl } = await import(
@ -153,13 +161,13 @@ export default class LogsCmd extends Command {
}
logger.logDebug('Streaming logs');
const logStream = await deviceApi.getLogStream();
await displayDeviceLogs(
logStream,
await connectAndDisplayDeviceLogs({
deviceApi,
logger,
options.system || false,
options.service,
);
system: options.system || false,
filterServices: options.service,
maxAttempts: 1 + (options['max-retry'] ?? MAX_RETRY),
});
} else {
// Logs from cloud
await Command.checkLoggedIn();

View File

@ -366,15 +366,15 @@ export const pushAndUpdateServiceImages = function (
images.map(({ serviceImage, localImage, props, logs }, index) =>
Promise.all([
localImage.inspect().then((img) => img.Size),
retry(
retry({
// @ts-ignore
() => progress.push(localImage.name, reporters[index], opts),
3, // `times` - retry 3 times
func: () => progress.push(localImage.name, reporters[index], opts),
maxAttempts: 3, // try calling func 3 times (max)
// @ts-ignore
localImage.name, // `label` included in retry log messages
2000, // `delayMs` - wait 2 seconds before the 1st retry
1.4, // `backoffScaler` - wait multiplier for each retry
).finally(renderer.end),
label: localImage.name, // label for retry log messages
initialDelayMs: 2000, // wait 2 seconds before the 1st retry
backoffScaler: 1.4, // wait multiplier for each retry
}).finally(renderer.end),
])
.then(
/** @type {([number, string]) => void} */

View File

@ -123,7 +123,6 @@ async function environmentFromInput(
export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
const { exitWithExpectedError } = await import('../../errors');
const { displayDeviceLogs } = await import('./logs');
// Resolve .local addresses to IP to avoid
// issue with Windows and rapid repeat lookups.
@ -220,6 +219,10 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
buildLogs,
);
globalLogger.outputDeferredMessages();
// Print a newline to clearly separate build time and runtime
console.log();
const envs = await environmentFromInput(
opts.env,
Object.getOwnPropertyNames(project.composition.services),
@ -244,10 +247,11 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
// Now that we've set the target state, the device will do it's thing
// so we can either just display the logs, or start a livepush session
// (whilst also display logs)
const promises: Array<Promise<void>> = [streamDeviceLogs(api, opts)];
let livepush: LivepushManager | null = null;
if (!opts.nolive) {
// Print a newline to clear seperate build time and runtime
console.log();
const livepush = new LivepushManager({
livepush = new LivepushManager({
api,
buildContext: opts.source,
buildTasks,
@ -257,43 +261,37 @@ export async function deployToDevice(opts: DeviceDeployOptions): Promise<void> {
buildLogs: buildLogs!,
deployOpts: opts,
});
const promises: Array<Promise<void>> = [livepush.init()];
// Only show logs if we're not detaching
if (!opts.detached) {
const logStream = await api.getLogStream();
globalLogger.logInfo('Streaming device logs...');
promises.push(
displayDeviceLogs(logStream, globalLogger, opts.system, opts.services),
);
} else {
promises.push(livepush.init());
if (opts.detached) {
globalLogger.logLivepush(
'Running in detached mode, no service logs will be shown',
);
}
globalLogger.logLivepush('Watching for file changes...');
globalLogger.outputDeferredMessages();
await Promise.all(promises);
livepush.close();
} else {
if (opts.detached) {
return;
}
// Print an empty newline to separate the build output
// from the device output
console.log();
// Now all we need to do is stream back the logs
const logStream = await api.getLogStream();
globalLogger.logInfo('Streaming device logs...');
globalLogger.outputDeferredMessages();
await displayDeviceLogs(
logStream,
globalLogger,
opts.system,
opts.services,
);
}
await Promise.all(promises).finally(() => {
// Stop watching files after log streaming ends (e.g. on SIGINT)
livepush?.close();
});
}
async function streamDeviceLogs(
deviceApi: DeviceAPI,
opts: DeviceDeployOptions,
) {
// Only show logs if we're not detaching
if (opts.detached) {
return;
}
globalLogger.logInfo('Streaming device logs...');
const { connectAndDisplayDeviceLogs } = await import('./logs');
return connectAndDisplayDeviceLogs({
deviceApi,
logger: globalLogger,
system: opts.system || false,
filterServices: opts.services,
maxAttempts: 1001,
});
}
function connectToDocker(host: string, port: number): Docker {

View File

@ -283,6 +283,7 @@ export class LivepushManager {
return monitor;
}
/** Stop the filesystem watcher, allowing the Node process to exit gracefully */
public close() {
for (const container of Object.values(this.containers)) {
container.monitor.close().catch((err) => {

View File

@ -1,9 +1,33 @@
/**
* @license
* Copyright 2018-2020 Balena Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import ColorHash = require('color-hash');
import * as _ from 'lodash';
import type { Readable } from 'stream';
import { getChalk } from '../lazy';
import Logger = require('../logger');
import { ExpectedError } from '../../errors';
class DeviceConnectionLostError extends ExpectedError {
public static defaultMsg = 'Connection to device lost';
constructor(msg?: string) {
super(msg || DeviceConnectionLostError.defaultMsg);
}
}
interface Log {
message: string;
@ -32,24 +56,74 @@ interface BuildLog {
* @param filterService Filter the logs so that only logs
* from a single service will be displayed
*/
export function displayDeviceLogs(
async function displayDeviceLogs(
logs: Readable,
logger: Logger,
system: boolean,
filterServices?: string[],
): Promise<void> {
return new Promise((resolve, reject) => {
logs.on('data', (log) => {
displayLogLine(log, logger, system, filterServices);
let gotSignal = false;
const handleSignal = () => {
gotSignal = true;
logs.emit('close');
};
process.once('SIGINT', handleSignal);
process.once('SIGTERM', handleSignal);
try {
await new Promise((resolve, reject) => {
logs.on('data', (log) => {
displayLogLine(log, logger, system, filterServices);
});
logs.once('error', reject);
logs.once('end', () => {
logger.logWarn(DeviceConnectionLostError.defaultMsg);
if (gotSignal) {
resolve();
} else {
reject(new DeviceConnectionLostError());
}
});
});
logs.once('error', reject);
logs.once('end', () => {
logger.logWarn('Connection to device lost');
resolve();
} finally {
process.removeListener('SIGINT', handleSignal);
process.removeListener('SIGTERM', handleSignal);
}
}
export async function connectAndDisplayDeviceLogs({
deviceApi,
logger,
system,
filterServices,
maxAttempts = 3,
}: {
deviceApi: import('./api').DeviceAPI;
logger: Logger;
system: boolean;
filterServices?: string[];
maxAttempts?: number;
}) {
async function connectAndDisplay() {
// Open a new connection to the device's supervisor, TCP port 48484
const logStream = await deviceApi.getLogStream();
return displayDeviceLogs(logStream, logger, system, filterServices);
}
const { retry } = await import('../../utils/helpers');
try {
await retry({
func: connectAndDisplay,
maxAttempts,
label: 'Streaming logs',
});
process.once('SIGINT', () => logs.emit('close'));
process.once('SIGTERM', () => logs.emit('close'));
});
} catch (err) {
if (err instanceof DeviceConnectionLostError) {
err.message = `Max retry count (${
maxAttempts - 1
}) exceeded while attempting to reconnect to the device`;
}
throw err;
}
}
export function displayBuildLog(log: BuildLog, logger: Logger): void {

View File

@ -204,39 +204,62 @@ function getApplication(
) as Promise<ApplicationWithDeviceType>;
}
const second = 1000; // 1000 milliseconds
const minute = 60 * second;
export const delay = promisify(setTimeout);
/**
* Call `func`, and if func() throws an error or returns a promise that
* eventually rejects, retry it `times` many times, each time printing a
* log message including the given `label` and the error that led to
* retrying. Wait delayMs before the first retry, multiplying the wait
* by backoffScaler for each further attempt.
* eventually rejects, retry it `times` many times, each time printing a log
* message including the given `label` and the error that led to retrying.
* Wait initialDelayMs before the first retry. Before each further retry,
* the delay is reduced by the time elapsed since the last retry, and
* increased by multiplying the result by backoffScaler.
* @param func: The function to call and, if needed, retry calling
* @param times: How many times to retry calling func()
* @param maxAttempts: How many times (max) to try calling func().
* func() will always be called at least once.
* @param label: Label to include in the retry log message
* @param startingDelayMs: How long to wait before the first retry
* @param initialDelayMs: How long to wait before the first retry
* @param backoffScaler: Multiplier to previous wait time
* @param count: Used "internally" for the recursive calls
* @param maxSingleDelayMs: Maximum interval between retries
*/
export async function retry<T>(
func: () => T,
times: number,
label: string,
startingDelayMs = 1000,
export async function retry<T>({
func,
maxAttempts,
label,
initialDelayMs = 1000,
backoffScaler = 2,
): Promise<T> {
for (let count = 0; count < times - 1; count++) {
maxSingleDelayMs = 1 * minute,
}: {
func: () => T;
maxAttempts: number;
label: string;
initialDelayMs?: number;
backoffScaler?: number;
maxSingleDelayMs?: number;
}): Promise<T> {
let delayMs = initialDelayMs;
for (let count = 0; count < maxAttempts - 1; count++) {
const lastAttemptMs = Date.now();
try {
return await func();
} catch (err) {
const delayMS = backoffScaler ** count * startingDelayMs;
if (count) {
// use Math.max to work around system time changes, e.g. DST
const elapsedMs = Math.max(0, Date.now() - lastAttemptMs);
// reduce delayMs by the time elapsed since the last attempt
delayMs = Math.max(initialDelayMs, delayMs - elapsedMs);
// increase delayMs by the backoffScaler factor
delayMs = Math.min(maxSingleDelayMs, delayMs * backoffScaler);
}
const sec = delayMs / 1000;
const secStr = sec < 10 ? sec.toFixed(1) : Math.round(sec).toString();
console.log(
`Retrying "${label}" after ${(delayMS / 1000).toFixed(2)}s (${
count + 1
} of ${times}) due to: ${err}`,
`Retrying "${label}" after ${secStr}s (${count + 1} of ${
maxAttempts - 1
}) due to: ${err}`,
);
await delay(delayMS);
await delay(delayMs);
}
}
return await func();

View File

@ -46,20 +46,33 @@ describe('balena logs', function () {
itS('should reach the expected endpoints on a local device', async () => {
supervisor.expectGetPing();
supervisor.expectGetLogs();
supervisor.expectGetLogs();
const { err, out } = await runCommand('logs 1.2.3.4');
const { err, out } = await runCommand('logs 1.2.3.4 --max-retry 1');
expect(err).to.be.empty;
const errLines = cleanOutput(err, true);
const errMsg =
'Max retry count (1) exceeded while attempting to reconnect to the device';
if (process.env.DEBUG) {
expect(errLines).to.include(errMsg);
} else {
expect(errLines).to.have.members([errMsg]);
}
const removeTimestamps = (logLine: string) =>
logLine.replace(/(?<=\[Logs\]) \[.+?\]/, '');
const cleanedOut = cleanOutput(out, true).map((l) => removeTimestamps(l));
expect(cleanedOut).to.deep.equal([
expect(cleanedOut).to.have.members([
'[Logs] Streaming logs',
'[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Error] Connection to device lost',
'[Warn] Connection to device lost',
'Retrying "Streaming logs" after 1.0s (1 of 1) due to: DeviceConnectionLostError: Connection to device lost',
'[Logs] Streaming logs',
'[Logs] [bar] bar 8 (332) Linux 4e3f81149d71 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Logs] [foo] foo 8 (200) Linux cc5df60d89ee 4.19.75 #1 SMP PREEMPT Mon Mar 23 11:50:49 UTC 2020 aarch64 GNU/Linux',
'[Warn] Connection to device lost',
]);
});
});