balena-cli/lib/utils/device/api.ts
Cameron Diver 96c975d17e
Use TCP keepalive probes to detect local log stream closing
Change-type: patch
Closes: #1219
Signed-off-by: Cameron Diver <cameron@balena.io>
2019-05-14 11:39:57 +01:00

259 lines
6.1 KiB
TypeScript

/**
* @license
* Copyright 2019 Balena Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as Bluebird from 'bluebird';
import * as _ from 'lodash';
import { NodeJSSocketWithFileDescriptor } from 'net-keepalive';
import * as os from 'os';
import * as request from 'request';
import * as Stream from 'stream';
import Logger = require('../logger');
import * as ApiErrors from './errors';
export interface DeviceResponse {
[key: string]: any;
status: 'success' | 'failed';
message?: string;
}
export interface DeviceInfo {
deviceType: string;
arch: string;
}
export interface Status {
appState: 'applied' | 'applying';
overallDownloadProgress: null | number;
containers: Array<{
status: string;
serviceName: string;
appId: number;
imageId: number;
serviceId: number;
containerId: string;
createdAt: string;
}>;
images: Array<{
name: string;
appId: number;
serviceName: string;
imageId: number;
dockerImageId: string;
status: string;
downloadProgress: null | number;
}>;
}
const deviceEndpoints = {
setTargetState: 'v2/local/target-state',
getTargetState: 'v2/local/target-state',
getDeviceInformation: 'v2/local/device-info',
logs: 'v2/local/logs',
ping: 'ping',
version: 'v2/version',
status: 'v2/state/status',
};
export class DeviceAPI {
private deviceAddress: string;
public constructor(
private logger: Logger,
addr: string,
port: number = 48484,
) {
this.deviceAddress = `http://${addr}:${port}/`;
}
// Either return nothing, or throw an error with the info
public async setTargetState(state: any): Promise<void> {
const url = this.getUrlForAction('setTargetState');
return DeviceAPI.promisifiedRequest(
request.post,
{
url,
json: true,
body: state,
},
this.logger,
);
}
public async getTargetState(): Promise<any> {
const url = this.getUrlForAction('getTargetState');
return DeviceAPI.promisifiedRequest(
request.get,
{
url,
json: true,
},
this.logger,
).then(body => {
return body.state;
});
}
public async getDeviceInformation(): Promise<DeviceInfo> {
const url = this.getUrlForAction('getDeviceInformation');
return DeviceAPI.promisifiedRequest(
request.get,
{
url,
json: true,
},
this.logger,
).then(body => {
return body.info;
});
}
public async ping(): Promise<void> {
const url = this.getUrlForAction('ping');
return DeviceAPI.promisifiedRequest(
request.get,
{
url,
},
this.logger,
);
}
public getVersion(): Promise<string> {
const url = this.getUrlForAction('version');
return DeviceAPI.promisifiedRequest(request.get, {
url,
json: true,
}).then(body => {
if (body.status !== 'success') {
throw new ApiErrors.DeviceAPIError(
'Non-successful response from supervisor version endpoint',
);
}
return body.version;
});
}
public getStatus(): Promise<Status> {
const url = this.getUrlForAction('status');
return DeviceAPI.promisifiedRequest(request.get, {
url,
json: true,
}).then(body => {
if (body.status !== 'success') {
throw new ApiErrors.DeviceAPIError(
'Non-successful response from supervisor status endpoint',
);
}
return _.omit(body, 'status') as Status;
});
}
public getLogStream(): Bluebird<Stream.Readable> {
const url = this.getUrlForAction('logs');
// Don't use the promisified version here as we want to stream the output
return new Bluebird((resolve, reject) => {
const req = request.get(url);
req.on('error', reject).on('response', async res => {
if (res.statusCode !== 200) {
reject(
new ApiErrors.DeviceAPIError(
'Non-200 response from log streaming endpoint',
),
);
}
res.socket.setKeepAlive(true, 1000);
if (os.platform() !== 'win32') {
const NetKeepalive = await import('net-keepalive');
// Certain versions of typescript won't convert
// this automatically
const sock = (res.socket as any) as NodeJSSocketWithFileDescriptor;
// We send a tcp keepalive probe once every 5 seconds
NetKeepalive.setKeepAliveInterval(sock, 5000);
// After 5 failed probes, the connection is marked as
// closed
NetKeepalive.setKeepAliveProbes(sock, 5);
}
resolve(res);
});
});
}
private getUrlForAction(action: keyof typeof deviceEndpoints): string {
return `${this.deviceAddress}${deviceEndpoints[action]}`;
}
// A helper method for promisifying general (non-streaming) requests. Streaming
// requests should use a seperate setup
private static async promisifiedRequest<T>(
requestMethod: (
opts: T,
cb: (err?: any, res?: any, body?: any) => void,
) => void,
opts: T,
logger?: Logger,
): Promise<any> {
interface ObjectWithUrl {
url?: string;
}
if (logger != null) {
let url: string | null = null;
if (_.isObject(opts) && (opts as ObjectWithUrl).url != null) {
// the `as string` shouldn't be necessary, but the type system
// is getting a little confused
url = (opts as ObjectWithUrl).url as string;
} else if (_.isString(opts)) {
url = opts;
}
if (url != null) {
logger.logDebug(`Sending request to ${url}`);
}
}
return Bluebird.fromCallback<[request.Response, { message: string }]>(
cb => {
return requestMethod(opts, cb);
},
{ multiArgs: true },
).then(([response, body]) => {
switch (response.statusCode) {
case 200:
return body;
case 400:
throw new ApiErrors.BadRequestDeviceAPIError(body.message);
case 503:
throw new ApiErrors.ServiceUnavailableAPIError(body.message);
default:
throw new ApiErrors.DeviceAPIError(body.message);
}
});
}
}
export default DeviceAPI;