From 02736113a394563dbb265cfe206e8e2aa65f9e6d Mon Sep 17 00:00:00 2001 From: Cameron Diver Date: Wed, 23 Jan 2019 19:24:57 +0000 Subject: [PATCH] refactor: Convert docker-utils module to typescript Change-type: patch Closes: #868 Signed-off-by: Cameron Diver --- src/application-manager.coffee | 5 +- src/compose/images.ts | 30 +-- src/compose/network-manager.ts | 2 +- src/compose/network.ts | 2 +- src/compose/volumes.ts | 5 +- src/lib/docker-utils.coffee | 168 ----------------- src/lib/docker-utils.d.ts | 41 ---- src/lib/docker-utils.ts | 331 +++++++++++++++++++++++++++++++++ src/lib/request.ts | 2 + src/logger.ts | 2 +- typings/docker-delta.d.ts | 12 ++ 11 files changed, 360 insertions(+), 240 deletions(-) delete mode 100644 src/lib/docker-utils.coffee delete mode 100644 src/lib/docker-utils.d.ts create mode 100644 src/lib/docker-utils.ts create mode 100644 typings/docker-delta.d.ts diff --git a/src/application-manager.coffee b/src/application-manager.coffee index 04b18581..9bed15a6 100644 --- a/src/application-manager.coffee +++ b/src/application-manager.coffee @@ -8,7 +8,7 @@ path = require 'path' constants = require './lib/constants' -Docker = require './lib/docker-utils' +{ DockerUtils: Docker } = require './lib/docker-utils' { LocalModeManager } = require './local-mode' updateLock = require './lib/update-lock' { checkTruthy, checkInt, checkString } = require './lib/validation' @@ -689,8 +689,7 @@ module.exports = class ApplicationManager extends EventEmitter normaliseAndExtendAppFromDB: (app) => Promise.join( @config.get('extendedEnvOptions') - @docker.getNetworkGateway(constants.supervisorNetworkInterface) - .catchReturn('127.0.0.1') + @docker.getNetworkGateway(constants.supervisorNetworkInterface).catch(-> '127.0.0.1') Promise.props({ firmware: pathExistsOnHost('/lib/firmware') modules: pathExistsOnHost('/lib/modules') diff --git a/src/compose/images.ts b/src/compose/images.ts index bf57a028..bee71c0f 100644 --- a/src/compose/images.ts +++ b/src/compose/images.ts @@ -4,10 +4,13 @@ import { EventEmitter } from 'events'; import * as _ from 'lodash'; import StrictEventEmitter from 'strict-event-emitter-types'; -import { SchemaReturn } from '../config/schema-type'; import Database from '../db'; import * as constants from '../lib/constants'; -import DockerUtils = require('../lib/docker-utils'); +import { + DeltaFetchOptions, + DockerUtils, + FetchOptions, +} from '../lib/docker-utils'; import { DeltaStillProcessingError, NotFoundError } from '../lib/errors'; import * as LogTypes from '../lib/log-types'; import * as validation from '../lib/validation'; @@ -29,25 +32,6 @@ interface FetchProgressEvent { percentage: number; } -// TODO: This is copied from src/lib/docker-utils.d.ts but because of the -// export mechanism used, we can't export it. Once we convert docker-utils -// to typescript, remove this -interface DeltaFetchOptions { - deltaRequestTimeout: number; - deltaApplyTimeout: number; - deltaRetryCount: number; - deltaRetryInterval: number; - uuid: string; - currentApiKey: string; - deltaEndpoint: string; - apiEndpoint: string; - deltaSource: string; - deltaSourceId: string; - deltaVersion: string; -} - -type FetchOptions = SchemaReturn<'fetchOptions'> & { deltaSource?: string }; - export interface Image { id: number; // image registry/repo@digest or registry/repo:tag @@ -133,7 +117,7 @@ export class Images extends (EventEmitter as { try { let id; - if (opts.delta && opts.deltaSource != null) { + if (opts.delta && (opts as DeltaFetchOptions).deltaSource != null) { id = await this.fetchDelta(image, opts, onProgress); } else { id = await this.fetchImage(image, opts, onProgress); @@ -598,7 +582,7 @@ export class Images extends (EventEmitter as { image: Image, opts: FetchOptions, onProgress: (evt: FetchProgressEvent) => void, - ): Bluebird { + ): Promise { this.logger.logSystemEvent(LogTypes.downloadImage, { image }); return this.docker.fetchImageWithProgress(image.name, opts, onProgress); } diff --git a/src/compose/network-manager.ts b/src/compose/network-manager.ts index 59cea657..c2b7a1d3 100644 --- a/src/compose/network-manager.ts +++ b/src/compose/network-manager.ts @@ -3,7 +3,7 @@ import { fs } from 'mz'; import * as _ from 'lodash'; import * as constants from '../lib/constants'; -import Docker = require('../lib/docker-utils'); +import Docker from '../lib/docker-utils'; import { ENOENT, NotFoundError } from '../lib/errors'; import { Logger } from '../logger'; import { Network, NetworkOptions } from './network'; diff --git a/src/compose/network.ts b/src/compose/network.ts index 99d4de43..02f5d9b1 100644 --- a/src/compose/network.ts +++ b/src/compose/network.ts @@ -1,7 +1,7 @@ import * as Bluebird from 'bluebird'; import * as _ from 'lodash'; -import Docker = require('../lib/docker-utils'); +import Docker from '../lib/docker-utils'; import { InvalidAppIdError, NotFoundError } from '../lib/errors'; import logTypes = require('../lib/log-types'); import { checkInt } from '../lib/validation'; diff --git a/src/compose/volumes.ts b/src/compose/volumes.ts index 64d79369..503e17c0 100644 --- a/src/compose/volumes.ts +++ b/src/compose/volumes.ts @@ -2,7 +2,7 @@ import * as Dockerode from 'dockerode'; import * as _ from 'lodash'; import * as path from 'path'; -import Docker = require('../lib/docker-utils'); +import Docker from '../lib/docker-utils'; import Logger from '../logger'; import constants = require('../lib/constants'); @@ -205,7 +205,8 @@ export class Volumes { private async listWithBothLabels(): Promise { // We have to cast the listVolumes call from any[] to any below, until the // relevant PR: https://github.com/DefinitelyTyped/DefinitelyTyped/pull/32383 - // is merged and released + // is merged and released - we can also replace Dockerode here with the Docker + // DockerUtils class imported above const [legacyResponse, currentResponse]: [ Dockerode.VolumeInfoList, Dockerode.VolumeInfoList diff --git a/src/lib/docker-utils.coffee b/src/lib/docker-utils.coffee deleted file mode 100644 index 11a9a25d..00000000 --- a/src/lib/docker-utils.coffee +++ /dev/null @@ -1,168 +0,0 @@ -constants = require './constants' -DockerToolbelt = require 'docker-toolbelt' -{ DockerProgress } = require 'docker-progress' -Promise = require 'bluebird' -dockerDelta = require 'docker-delta' -_ = require 'lodash' -{ request, resumable } = require './request' -{ envArrayToObject } = require './conversions' -{ DeltaStillProcessingError, InvalidNetGatewayError } = require './errors' -{ checkInt } = require './validation' - -applyRsyncDelta = (imgSrc, deltaUrl, applyTimeout, opts, onProgress, log) -> - log('Applying rsync delta...') - new Promise (resolve, reject) -> - req = resumable(Object.assign({ url: deltaUrl }, opts)) - .on('progress', onProgress) - .on('retry', onProgress) - .on('error', reject) - .on 'response', (res) -> - if res.statusCode isnt 200 - reject(new Error("Got #{res.statusCode} when requesting delta from storage.")) - else if parseInt(res.headers['content-length']) is 0 - reject(new Error('Invalid delta URL.')) - else - deltaStream = dockerDelta.applyDelta(imgSrc, { log, timeout: applyTimeout }) - res.pipe(deltaStream) - .on('id', (id) -> resolve('sha256:' + id)) - .on 'error', (err) -> - log("Delta stream emitted error: #{err}") - req.abort(err) - reject(err) - -applyBalenaDelta = (docker, deltaImg, token, onProgress, log) -> - log('Applying balena delta...') - if token? - log('Using registry auth token') - auth = { authconfig: registrytoken: token } - docker.dockerProgress.pull(deltaImg, onProgress, auth) - .then -> - docker.getImage(deltaImg).inspect().get('Id') - -module.exports = class DockerUtils extends DockerToolbelt - constructor: (opts) -> - super(opts) - @dockerProgress = new DockerProgress(dockerToolbelt: this) - @supervisorTagPromise = @normaliseImageName(constants.supervisorImage) - - getRepoAndTag: (image) => - @getRegistryAndName(image) - .then ({ registry, imageName, tagName }) -> - if registry? - registry = registry.toString().replace(':443', '') - repoName = "#{registry}/#{imageName}" - else - repoName = imageName - return { repo: repoName, tag: tagName } - - fetchDeltaWithProgress: (imgDest, fullDeltaOpts, onProgress) => - { - deltaRequestTimeout, deltaApplyTimeout, deltaRetryCount, deltaRetryInterval, - uuid, currentApiKey, deltaEndpoint, apiEndpoint, - deltaSource, deltaSourceId, deltaVersion - } = fullDeltaOpts - retryCount = checkInt(deltaRetryCount) - retryInterval = checkInt(deltaRetryInterval) - requestTimeout = checkInt(deltaRequestTimeout) - applyTimeout = checkInt(deltaApplyTimeout) - version = checkInt(deltaVersion) - deltaSourceId ?= deltaSource - - log = (str) -> - console.log("delta(#{deltaSource}): #{str}") - - if not (version in [ 2, 3 ]) - log("Unsupported delta version: #{version}. Falling back to regular pull") - return @fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress) - - # Since the supervisor never calls this function without a source anymore, - # this should never happen, but we handle it anyways. - if !deltaSource? - log('Falling back to regular pull due to lack of a delta source') - return @fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress) - - docker = this - - log("Starting delta to #{imgDest}") - Promise.join @getRegistryAndName(imgDest), @getRegistryAndName(deltaSource), (dstInfo, srcInfo) -> - tokenEndpoint = "#{apiEndpoint}/auth/v1/token" - opts = - auth: - user: 'd_' + uuid - pass: currentApiKey - sendImmediately: true - json: true - timeout: requestTimeout - url = "#{tokenEndpoint}?service=#{dstInfo.registry}&scope=repository:#{dstInfo.imageName}:pull&scope=repository:#{srcInfo.imageName}:pull" - request.getAsync(url, opts) - .get(1) - .then (responseBody) -> - token = responseBody?.token - opts = - followRedirect: false - timeout: requestTimeout - - if token? - opts.auth = - bearer: token - sendImmediately: true - request.getAsync("#{deltaEndpoint}/api/v#{version}/delta?src=#{deltaSource}&dest=#{imgDest}", opts) - .spread (res, data) -> - if res.statusCode in [ 502, 504 ] - throw new DeltaStillProcessingError() - switch version - when 2 - if not (300 <= res.statusCode < 400 and res.headers['location']?) - throw new Error("Got #{res.statusCode} when requesting image from delta server.") - deltaUrl = res.headers['location'] - if !deltaSource? - deltaSrc = null - else - deltaSrc = deltaSourceId - resumeOpts = { timeout: requestTimeout, maxRetries: retryCount, retryInterval } - applyRsyncDelta(deltaSrc, deltaUrl, applyTimeout, resumeOpts, onProgress, log) - when 3 - if res.statusCode isnt 200 - throw new Error("Got #{res.statusCode} when requesting image from delta server.") - name = JSON.parse(data).name - applyBalenaDelta(docker, name, token, onProgress, log) - else - # we guard against arbitrary versions above, so this can't really happen - throw new Error("Unsupported delta version: #{version}") - .catch dockerDelta.OutOfSyncError, (err) => - log('Falling back to regular pull') - @fetchImageWithProgress(imgDest, fullDeltaOpts, onProgress) - .tap -> - log('Delta applied successfully') - .tapCatch (err) -> - log("Delta failed with: #{err}") - - - fetchImageWithProgress: (image, { uuid, currentApiKey }, onProgress) => - @getRegistryAndName(image) - .then ({ registry }) => - dockerOptions = - authconfig: - username: 'd_' + uuid, - password: currentApiKey, - serveraddress: registry - @dockerProgress.pull(image, onProgress, dockerOptions) - .then => - @getImage(image).inspect().get('Id') - - getImageEnv: (id) -> - @getImage(id).inspect() - .get('Config').get('Env') - .then(envArrayToObject) - .catch (err) -> - console.log('Error getting env from image', err, err.stack) - return {} - - getNetworkGateway: (netName) => - return Promise.resolve('127.0.0.1') if netName == 'host' - @getNetwork(netName).inspect() - .then (netInfo) -> - conf = netInfo?.IPAM?.Config?[0] - return conf.Gateway if conf?.Gateway? - return conf.Subnet.replace('.0/16', '.1') if _.endsWith(conf?.Subnet, '.0/16') - throw new InvalidNetGatewayError("Cannot determine network gateway for #{netName}") diff --git a/src/lib/docker-utils.d.ts b/src/lib/docker-utils.d.ts deleted file mode 100644 index 59bec75d..00000000 --- a/src/lib/docker-utils.d.ts +++ /dev/null @@ -1,41 +0,0 @@ -import * as Bluebird from 'bluebird'; -import DockerToolbelt = require('docker-toolbelt'); -import { SchemaReturn } from '../config/schema-type'; - -// This is the EnvVarObject from src/lib/types, but it seems we cannot -// reference it relatively. Just redefine it as it's simple and won't change -// often - -interface EnvVarObject { - [name: string]: string; -} - -interface TaggedRepoImage { - repo: string; - tag: string; -} - -type FetchOptions = SchemaReturn<'fetchOptions'>; - -declare class DockerUtils extends DockerToolbelt { - constructor(opts: any); - - getRepoAndTag(image: string): Bluebird; - - fetchDeltaWithProgress( - imgDest: string, - fullDeltaOpts: any, - onProgress: (args: any) => void, - ): Bluebird; - - fetchImageWithProgress( - image: string, - config: FetchOptions, - onProgress: (args: any) => void, - ): Bluebird; - - getImageEnv(id: string): Bluebird; - getNetworkGateway(netName: string): Bluebird; -} - -export = DockerUtils; diff --git a/src/lib/docker-utils.ts b/src/lib/docker-utils.ts new file mode 100644 index 00000000..9a769424 --- /dev/null +++ b/src/lib/docker-utils.ts @@ -0,0 +1,331 @@ +import { DockerProgress, ProgressCallback } from 'docker-progress'; +import * as Dockerode from 'dockerode'; +import * as _ from 'lodash'; + +import { applyDelta, OutOfSyncError } from 'docker-delta'; +import DockerToolbelt = require('docker-toolbelt'); + +import { SchemaReturn } from '../config/schema-type'; +import { envArrayToObject } from './conversions'; +import { + DeltaStillProcessingError, + InternalInconsistencyError, + InvalidNetGatewayError, +} from './errors'; +import { request, requestLib, resumable } from './request'; +import { EnvVarObject } from './types'; + +export type FetchOptions = SchemaReturn<'fetchOptions'>; +export type DeltaFetchOptions = FetchOptions & { + deltaSourceId: string; + deltaSource: string; +}; + +interface RsyncApplyOptions { + timeout: number; + maxRetries: number; + retryInterval: number; +} + +export class DockerUtils extends DockerToolbelt { + public dockerProgress: DockerProgress; + + public constructor(opts: Dockerode.DockerOptions) { + super(opts); + this.dockerProgress = new DockerProgress({ dockerToolbelt: this }); + } + + public async getRepoAndTag( + image: string, + ): Promise<{ repo: string; tag: string }> { + const { registry, imageName, tagName } = await this.getRegistryAndName( + image, + ); + + let repoName = imageName; + + if (registry != null) { + repoName = `${registry}/${imageName}`; + } + + return { repo: repoName, tag: tagName }; + } + + public async fetchDeltaWithProgress( + imgDest: string, + deltaOpts: DeltaFetchOptions, + onProgress: ProgressCallback, + ): Promise { + const deltaSourceId = + deltaOpts.deltaSourceId != null + ? deltaOpts.deltaSourceId + : deltaOpts.deltaSource; + + const timeout = deltaOpts.deltaApplyTimeout; + + if (timeout == null) { + throw new InternalInconsistencyError( + 'A delta apply timeout is not set in fetchDeltaWithProgress!', + ); + } + + const log = (str: string) => + console.log(`delta(${deltaOpts.deltaSource}): ${str}`); + + if (!_.includes([2, 3], deltaOpts.deltaVersion)) { + log( + `Unsupported delta version: ${ + deltaOpts.deltaVersion + }. Failling back to regular pull`, + ); + return await this.fetchImageWithProgress(imgDest, deltaOpts, onProgress); + } + + // Since the supevisor never calls this function with a source anymore, + // this should never happen, but w ehandle it anyway + if (deltaOpts.deltaSource == null) { + log('Falling back to regular pull due to lack of a delta source'); + return this.fetchImageWithProgress(imgDest, deltaOpts, onProgress); + } + + const docker = this; + log(`Starting delta to ${imgDest}`); + + const [dstInfo, srcInfo] = await Promise.all([ + this.getRegistryAndName(imgDest), + this.getRegistryAndName(deltaOpts.deltaSource), + ]); + + const tokenEndpoint = `${deltaOpts.apiEndpoint}/auth/v1/token`; + const tokenOpts: requestLib.CoreOptions = { + auth: { + user: `d_${deltaOpts.uuid}`, + pass: deltaOpts.currentApiKey, + sendImmediately: true, + }, + }; + const tokenUrl = `${tokenEndpoint}?service=${ + dstInfo.registry + }&scope=repository:${dstInfo.imageName}:pull&scope=repository:${ + srcInfo.imageName + }:pull`; + + const tokenResponseBody = (await request.getAsync(tokenUrl, tokenOpts))[1]; + const token = tokenResponseBody != null ? tokenResponseBody.token : null; + + const opts: requestLib.CoreOptions = { + followRedirect: false, + timeout: deltaOpts.deltaRequestTimeout, + }; + + if (token != null) { + opts.auth = { + bearer: token, + sendImmediately: true, + }; + } + + const url = `${deltaOpts.deltaEndpoint}/api/v${ + deltaOpts.deltaVersion + }/delta?src=${deltaOpts.deltaSource}&dest=${imgDest}`; + + const [res, data] = await request.getAsync(url, opts); + if (res.statusCode === 502 || res.statusCode === 504) { + throw new DeltaStillProcessingError(); + } + let id: string; + try { + switch (deltaOpts.deltaVersion) { + case 2: + if ( + !( + res.statusCode >= 300 && + res.statusCode < 400 && + res.headers['location'] != null + ) + ) { + throw new Error( + `Got ${res.statusCode} when request image from delta server.`, + ); + } + const deltaUrl = res.headers['location']; + const deltaSrc = deltaSourceId; + const resumeOpts = { + timeout: deltaOpts.deltaRequestTimeout, + maxRetries: deltaOpts.deltaRetryCount, + retryInterval: deltaOpts.deltaRetryInterval, + }; + id = await DockerUtils.applyRsyncDelta( + deltaSrc, + deltaUrl, + timeout, + resumeOpts, + onProgress, + log, + ); + break; + case 3: + if (res.statusCode !== 200) { + throw new Error( + `Got ${ + res.statusCode + } when requesting v3 delta from delta server.`, + ); + } + let name; + try { + name = JSON.parse(data).name; + } catch (e) { + throw new Error( + `Got an error when parsing delta server response for v3 delta: ${e}`, + ); + } + id = await DockerUtils.applyBalenaDelta( + docker, + name, + token, + onProgress, + log, + ); + break; + default: + throw new Error( + `Unsupposed delta version: ${deltaOpts.deltaVersion}`, + ); + } + } catch (e) { + if (e instanceof OutOfSyncError) { + log('Falling back to regular pull due to delta out of sync error'); + return await this.fetchImageWithProgress( + imgDest, + deltaOpts, + onProgress, + ); + } else { + log(`Delta failed with ${e}`); + throw e; + } + } + + log(`Delta applied successfully`); + return id; + } + + public async fetchImageWithProgress( + image: string, + { uuid, currentApiKey }: FetchOptions, + onProgress: ProgressCallback, + ): Promise { + const { registry } = await this.getRegistryAndName(image); + + const dockerOpts = { + authconfig: { + username: `d_${uuid}`, + password: currentApiKey, + serverAddress: registry, + }, + }; + + await this.dockerProgress.pull(image, onProgress, dockerOpts); + return (await this.getImage(image).inspect()).Id; + } + + public async getImageEnv(id: string): Promise { + const inspect = await this.getImage(id).inspect(); + + try { + return envArrayToObject(_.get(inspect, ['Config', 'Env'], [])); + } catch (e) { + console.log('Error getting env from image', e, e.stack); + return {}; + } + } + + public async getNetworkGateway(networkName: string): Promise { + if (networkName === 'host') { + return '127.0.0.1'; + } + + const network = await this.getNetwork(networkName).inspect(); + const config = _.get(network, ['IPAM', 'Config', '0']); + if (config != null) { + if (config.Gateway != null) { + return config.Gateway; + } + if (config.Subnet != null && _.endsWith(config.Subnet, '.0/16')) { + return config.Subnet.replace('.0/16', '.1'); + } + } + throw new InvalidNetGatewayError( + `Cannot determine network gateway for ${networkName}`, + ); + } + + private static applyRsyncDelta( + imgSrc: string, + deltaUrl: string, + applyTimeout: number, + opts: RsyncApplyOptions, + onProgress: ProgressCallback, + log: (str: string) => void, + ): Promise { + log('Applying rsync delta...'); + + return new Promise((resolve, reject) => { + const req = resumable(Object.assign({ url: deltaUrl }, opts)); + req + .on('progress', onProgress) + .on('retry', onProgress) + .on('error', reject) + .on('response', res => { + if (res.statusCode !== 200) { + reject( + new Error( + `Got ${res.statusCode} when requesting delta from storage.`, + ), + ); + } else if (parseInt(res.headers['content-length'] || '0', 10) === 0) { + reject(new Error('Invalid delta URL')); + } else { + const deltaStream = applyDelta(imgSrc, { + log, + timeout: applyTimeout, + }); + res + .pipe(deltaStream) + .on('id', id => resolve(`sha256:${id}`)) + .on('error', err => { + log(`Delta stream emitted error: ${err}`); + req.abort(); + reject(err); + }); + } + }); + }); + } + + private static async applyBalenaDelta( + docker: DockerUtils, + deltaImg: string, + token: string | null, + onProgress: ProgressCallback, + log: (str: string) => void, + ): Promise { + log('Applying balena delta...'); + + let auth: Dictionary | undefined; + if (token != null) { + log('Using registry auth token'); + auth = { + authconfig: { + registrytoken: token, + }, + }; + } + + await docker.dockerProgress.pull(deltaImg, onProgress, auth); + return (await docker.getImage(deltaImg).inspect()).Id; + } +} + +export default DockerUtils; diff --git a/src/lib/request.ts b/src/lib/request.ts index a1073cd4..15497b79 100644 --- a/src/lib/request.ts +++ b/src/lib/request.ts @@ -7,6 +7,8 @@ import * as osRelease from './os-release'; import supervisorVersion = require('./supervisor-version'); +export { requestLib }; + const osVersion = osRelease.getOSVersionSync(constants.hostOSVersionPath); const osVariant = osRelease.getOSVariantSync(constants.hostOSVersionPath); diff --git a/src/logger.ts b/src/logger.ts index f3f195c9..755905c6 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -3,7 +3,7 @@ import * as es from 'event-stream'; import * as _ from 'lodash'; import { EventTracker } from './event-tracker'; -import Docker = require('./lib/docker-utils'); +import Docker from './lib/docker-utils'; import { LogType } from './lib/log-types'; import { writeLock } from './lib/update-lock'; import { diff --git a/typings/docker-delta.d.ts b/typings/docker-delta.d.ts new file mode 100644 index 00000000..39c25a8a --- /dev/null +++ b/typings/docker-delta.d.ts @@ -0,0 +1,12 @@ +declare module 'docker-delta' { + // Incomplete type definitions + import TypedError = require('typed-error'); + import { Duplex } from 'stream'; + + export class OutOfSyncError extends TypedError {} + + export function applyDelta( + imageSource: string, + opts: { log: (str: string) => void; timeout: number }, + ): Duplex; +}