Merge pull request #1066 from balena-io/extra-step-engine

Extract composition step engine to typescript module
This commit is contained in:
CameronDiver 2019-08-19 21:03:03 +01:00 committed by GitHub
commit 408ddaa477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 353 additions and 106 deletions

View File

@ -23,6 +23,7 @@ updateLock = require './lib/update-lock'
{ Network } = require './compose/network'
{ VolumeManager } = require './compose/volume-manager'
{ Volume } = require './compose/volume'
compositionSteps = require './compose/composition-steps'
Proxyvisor = require './proxyvisor'
@ -82,87 +83,30 @@ module.exports = class ApplicationManager extends EventEmitter
if changedConfig.appUpdatePollInterval
@images.appUpdatePollInterval = changedConfig.appUpdatePollInterval
@actionExecutors = {
stop: (step, { force = false, skipLock = false } = {}) =>
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
wait = step.options?.wait ? false
@services.kill(step.current, { removeContainer: false, wait })
.then =>
delete @_containerStarted[step.current.containerId]
kill: (step, { force = false, skipLock = false } = {}) =>
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
@services.kill(step.current)
.then =>
delete @_containerStarted[step.current.containerId]
if step.options?.removeImage
@images.removeByDockerId(step.current.config.image)
remove: (step) =>
# Only called for dead containers, so no need to take locks or anything
@services.remove(step.current)
updateMetadata: (step, { force = false, skipLock = false } = {}) =>
skipLock or= checkTruthy(step.current.config.labels['io.balena.legacy-container'])
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
@services.updateMetadata(step.current, step.target)
restart: (step, { force = false, skipLock = false } = {}) =>
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
@services.kill(step.current, { wait: true })
.then =>
delete @_containerStarted[step.current.containerId]
.then =>
@services.start(step.target)
.then (container) =>
@_containerStarted[container.id] = true
stopAll: (step, { force = false, skipLock = false } = {}) =>
@stopAll({ force, skipLock })
start: (step) =>
@services.start(step.target)
.then (container) =>
@_containerStarted[container.id] = true
updateCommit: (step) =>
@config.set({ currentCommit: step.target })
handover: (step, { force = false, skipLock = false } = {}) =>
@_lockingIfNecessary step.current.appId, { force, skipLock: skipLock or step.options?.skipLock }, =>
@services.handover(step.current, step.target)
fetch: (step) =>
startTime = process.hrtime()
@fetchesInProgress += 1
Promise.join(
@config.get('fetchOptions')
@images.getAvailable()
(opts, availableImages) =>
opts.deltaSource = @bestDeltaSource(step.image, availableImages)
@images.triggerFetch step.image, opts, (success) =>
@fetchesInProgress -= 1
elapsed = process.hrtime(startTime)
elapsedMs = elapsed[0] * 1000 + elapsed[1] / 1e6
@timeSpentFetching += elapsedMs
if success
# update_downloaded is true if *any* image has been downloaded,
# and it's relevant mostly for the legacy GET /v1/device endpoint
# that assumes a single-container app
@reportCurrentState(update_downloaded: true)
, step.serviceName
)
removeImage: (step) =>
@images.remove(step.image)
saveImage: (step) =>
@images.save(step.image)
cleanup: (step) =>
@config.get('localMode').then (localMode) =>
if !localMode
@images.cleanup()
createNetworkOrVolume: (step) =>
if step.model is 'network'
@networks.create(step.target)
else
@volumes.create(step.target)
removeNetwork: (step) =>
@networks.remove(step.current)
removeVolume: (step) =>
@volumes.remove(step.current)
ensureSupervisorNetwork: =>
@networks.ensureSupervisorNetwork()
}
@actionExecutors = compositionSteps.getExecutors({
lockFn: @_lockingIfNecessary,
services: @services,
networks: @networks,
volumes: @volumes,
applications: this,
images: @images,
config: @config,
callbacks: {
containerStarted: (id) =>
@_containerStarted[id] = true
containerKilled: (id) =>
delete @_containerStarted[id]
fetchStart: =>
@fetchesInProgress += 1
fetchEnd: =>
@fetchesInProgress -= 1
fetchTime: (time) =>
@timeSpentFetching += time
stateReport: (state) =>
@reportCurrentState(state)
bestDeltaSource: @bestDeltaSource
}
})
@validActions = _.keys(@actionExecutors).concat(@proxyvisor.validActions)
@router = createApplicationManagerRouter(this)
@images.on('change', @reportCurrentState)
@ -963,7 +907,7 @@ module.exports = class ApplicationManager extends EventEmitter
return @proxyvisor.executeStepAction(step)
if !_.includes(@validActions, step.action)
return Promise.reject(new Error("Invalid action #{step.action}"))
@actionExecutors[step.action](step, { force, skipLock })
@actionExecutors[step.action](_.merge({}, step, { force, skipLock }))
getExtraStateForComparison: (currentState, targetState) =>
containerIdsByAppId = {}

View File

@ -76,6 +76,7 @@ export class ApplicationManager extends EventEmitter {
networks: Dictionary<Network>;
}>
>;
public stopAll(opts: { force?: boolean; skipLock?: boolean }): Promise<void>;
public serviceNameFromId(serviceId: number): Bluebird<string>;
}

View File

@ -0,0 +1,302 @@
import * as _ from 'lodash';
import Config from '../config';
import ApplicationManager from '../application-manager';
import Images, { Image } from './images';
import Network from './network';
import Service from './service';
import ServiceManager from './service-manager';
import Volume from './volume';
import { checkTruthy } from '../lib/validation';
import { NetworkManager } from './network-manager';
import VolumeManager from './volume-manager';
interface BaseCompositionStepArgs {
force?: boolean;
skipLock?: boolean;
}
// FIXME: Most of the steps take the
// BaseCompositionStepArgs, but some also take an options
// structure which includes some of the same fields. It
// would be nice to remove the need for this
interface CompositionStepArgs {
stop: {
current: Service;
options?: {
skipLock?: boolean;
wait?: boolean;
};
} & BaseCompositionStepArgs;
kill: {
current: Service;
options?: {
skipLock?: boolean;
wait?: boolean;
removeImage?: boolean;
};
} & BaseCompositionStepArgs;
remove: {
current: Service;
} & BaseCompositionStepArgs;
updateMetadata: {
current: Service;
target: { imageId: number; releaseId: number };
options?: {
skipLock?: boolean;
};
} & BaseCompositionStepArgs;
restart: {
current: Service;
target: Service;
options?: {
skipLock?: boolean;
};
} & BaseCompositionStepArgs;
stopAll: BaseCompositionStepArgs;
start: {
target: Service;
} & BaseCompositionStepArgs;
updateCommit: {
target: string;
};
handover: {
current: Service;
target: Service;
options?: {
skipLock?: boolean;
};
} & BaseCompositionStepArgs;
fetch: {
image: Image;
serviceName: string;
};
removeImage: {
image: Image;
};
saveImage: {
image: Image;
};
cleanup: {};
createNetwork: {
target: Network;
};
createVolume: {
target: Volume;
};
removeNetwork: {
current: Network;
};
removeVolume: {
current: Volume;
};
ensureSupervisorNetwork: {};
}
type CompositionStepAction = keyof CompositionStepArgs;
type CompositionStep<T extends CompositionStepAction> = {
step: T;
} & CompositionStepArgs[T];
export function generateStep<T extends CompositionStepAction>(
action: T,
args: CompositionStepArgs[T],
): CompositionStep<T> {
return {
step: action,
...args,
};
}
type Executors<T extends CompositionStepAction> = {
[key in T]: (step: CompositionStep<key>) => Promise<unknown>
};
type LockingFn = (
// TODO: Once the entire codebase is typescript, change
// this to number
app: number | null,
args: BaseCompositionStepArgs,
fn: () => Promise<unknown>,
) => Promise<unknown>;
interface CompositionCallbacks {
// TODO: Once the entire codebase is typescript, change
// this to number
containerStarted: (containerId: string | null) => void;
containerKilled: (containerId: string | null) => void;
fetchStart: () => void;
fetchEnd: () => void;
fetchTime: (time: number) => void;
stateReport: (state: Dictionary<unknown>) => Promise<void>;
bestDeltaSource: (image: Image, available: Image[]) => string | null;
}
export function getExecutors(app: {
lockFn: LockingFn;
services: ServiceManager;
networks: NetworkManager;
volumes: VolumeManager;
applications: ApplicationManager;
images: Images;
config: Config;
callbacks: CompositionCallbacks;
}) {
const executors: Executors<CompositionStepAction> = {
stop: step => {
return app.lockFn(
step.current.appId,
{
force: step.force,
skipLock: step.skipLock || _.get(step, ['options', 'skipLock']),
},
async () => {
const wait = _.get(step, ['options', 'wait'], false);
await app.services.kill(step.current, {
removeContainer: false,
wait,
});
app.callbacks.containerKilled(step.current.containerId);
},
);
},
kill: step => {
return app.lockFn(
step.current.appId,
{
force: step.force,
skipLock: step.skipLock || _.get(step, ['options', 'skipLock']),
},
async () => {
await app.services.kill(step.current);
app.callbacks.containerKilled(step.current.containerId);
if (_.get(step, ['options', 'removeImage'])) {
await app.images.removeByDockerId(step.current.config.image);
}
},
);
},
remove: async step => {
// Only called for dead containers, so no need to
// take locks
await app.services.remove(step.current);
},
updateMetadata: step => {
const skipLock =
step.skipLock ||
checkTruthy(step.current.config.labels['io.balena.legacy-container']);
return app.lockFn(
step.current.appId,
{
force: step.force,
skipLock: skipLock || _.get(step, ['options', 'skipLock']),
},
async () => {
await app.services.updateMetadata(step.current, step.target);
},
);
},
restart: step => {
return app.lockFn(
step.current.appId,
{
force: step.force,
skipLock: step.skipLock || _.get(step, ['options', 'skipLock']),
},
async () => {
await app.services.kill(step.current, { wait: true });
app.callbacks.containerKilled(step.current.containerId);
const container = await app.services.start(step.target);
app.callbacks.containerStarted(container.id);
},
);
},
stopAll: async step => {
await app.applications.stopAll({
force: step.force,
skipLock: step.skipLock,
});
},
start: async step => {
const container = await app.services.start(step.target);
app.callbacks.containerStarted(container.id);
},
updateCommit: async step => {
await app.config.set({ currentCommit: step.target });
},
handover: step => {
return app.lockFn(
step.current.appId,
{
force: step.force,
skipLock: step.skipLock || _.get(step, ['options', 'skipLock']),
},
async () => {
await app.services.handover(step.current, step.target);
},
);
},
fetch: async step => {
const startTime = process.hrtime();
app.callbacks.fetchStart();
const [fetchOpts, availableImages] = await Promise.all([
app.config.get('fetchOptions'),
app.images.getAvailable(),
]);
const opts = {
deltaSource: app.callbacks.bestDeltaSource(step.image, availableImages),
...fetchOpts,
};
await app.images.triggerFetch(
step.image,
opts,
async success => {
app.callbacks.fetchEnd();
const elapsed = process.hrtime(startTime);
const elapsedMs = elapsed[0] * 1000 + elapsed[1] / 1e6;
app.callbacks.fetchTime(elapsedMs);
if (success) {
// update_downloaded is true if *any* image has
// been downloaded ,and it's relevant mostly for
// the legacy GET /v1/device endpoint that assumes
// a single container app
await app.callbacks.stateReport({ update_downloaded: true });
}
},
step.serviceName,
);
},
removeImage: async step => {
await app.images.remove(step.image);
},
saveImage: async step => {
await app.images.save(step.image);
},
cleanup: async () => {
const localMode = await app.config.get('localMode');
if (!localMode) {
await app.images.cleanup();
}
},
createNetwork: async step => {
await app.networks.create(step.target);
},
createVolume: async step => {
await app.volumes.create(step.target);
},
removeNetwork: async step => {
await app.networks.remove(step.current);
},
removeVolume: async step => {
await app.volumes.remove(step.current);
},
ensureSupervisorNetwork: async () => {
app.networks.ensureSupervisorNetwork();
},
};
return executors;
}

View File

@ -262,7 +262,7 @@ export class Images extends (EventEmitter as new () => ImageEventEmitter) {
);
}
public async getAvailable(_localMode: boolean): Promise<Image[]> {
public async getAvailable(): Promise<Image[]> {
const images = await this.withImagesFromDockerAndDB(
(dockerImages, supervisedImages) =>
_.filter(supervisedImages, image =>
@ -320,8 +320,8 @@ export class Images extends (EventEmitter as new () => ImageEventEmitter) {
.whereIn('id', ids);
}
public async getStatus(localMode: boolean) {
const images = await this.getAvailable(localMode);
public async getStatus() {
const images = await this.getAvailable();
for (const image of images) {
image.status = 'Downloaded';
image.downloadProgress = null;

View File

@ -235,3 +235,5 @@ export class Network {
return `${appId}_${name}`;
}
}
export default Network;

View File

@ -1027,3 +1027,5 @@ export class Service {
];
}
}
export default Service;

View File

@ -157,10 +157,9 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
router.get('/v2/applications/state', async (_req: Request, res: Response) => {
// It's kinda hacky to access the services and db via the application manager
// maybe refactor this code
const localMode = await deviceState.config.get('localMode');
Bluebird.join(
applications.services.getStatus(),
applications.images.getStatus(localMode),
applications.images.getStatus(),
applications.db.models('app').select(['appId', 'commit', 'name']),
(
services,
@ -431,7 +430,6 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
});
router.get('/v2/state/status', async (_req, res) => {
const localMode = await applications.config.get('localMode');
const currentRelease = await applications.config.get('currentCommit');
const pending = applications.deviceState.applyInProgress;
@ -450,24 +448,22 @@ export function createV2Api(router: Router, applications: ApplicationManager) {
let downloadProgressTotal = 0;
let downloads = 0;
const imagesStates = (await applications.images.getStatus(localMode)).map(
img => {
if (img.downloadProgress != null) {
downloadProgressTotal += img.downloadProgress;
downloads += 1;
}
return _.pick(
img,
'name',
'appId',
'serviceName',
'imageId',
'dockerImageId',
'status',
'downloadProgress',
);
},
);
const imagesStates = (await applications.images.getStatus()).map(img => {
if (img.downloadProgress != null) {
downloadProgressTotal += img.downloadProgress;
downloads += 1;
}
return _.pick(
img,
'name',
'appId',
'serviceName',
'imageId',
'dockerImageId',
'status',
'downloadProgress',
);
});
let overallDownloadProgress = null;
if (downloads > 0) {