Refactor image "volatile state" to use state pattern

This replaces stored `volatileState` with a more declarative ImageTask API.
An ImageTask stores volatile image state for operations that cannot be
obtained through an engine query, such as fetching and removing an
image, state that can be updated while the task is running.

Image controller methods can now use the `reportEvent` method to create
and update the state of a longer running task.
This commit is contained in:
Felipe Lalanne 2021-07-21 13:37:25 -04:00
parent 06c3f488e5
commit a1d098d8f3

View File

@ -15,7 +15,6 @@ import {
StatusError,
} from '../lib/errors';
import * as LogTypes from '../lib/log-types';
import * as validation from '../lib/validation';
import * as logger from '../logger';
import { ImageDownloadBackoffError } from './errors';
@ -68,8 +67,90 @@ const imageFetchLastFailureTime: Dictionary<ReturnType<
typeof process.hrtime
>> = {};
const imageCleanupFailures: Dictionary<number> = {};
// A store of volatile state for images (e.g. download progress), indexed by imageId
const volatileState: { [imageId: number]: Image } = {};
type ImageState = Pick<Image, 'status' | 'downloadProgress'>;
type ImageTask = {
// Indicates whether the task has been finished
done?: boolean;
// Current image state of the task
context: Image;
// Update the task with new context. This is a pure function
// meaning it doesn't modify the original task
update: (change?: ImageState) => ImageTaskUpdate;
// Finish the task. This is a pure function
// meaning it doesn't modify the original task
finish: () => ImageTaskUpdate;
};
type ImageTaskUpdate = [ImageTask, boolean];
// Create new running task with the given initial context
function createTask(initialContext: Image) {
// Task has only two state, is either running or finished
const running = (context: Image): ImageTask => {
return {
context,
update: ({ status, downloadProgress }: ImageState) =>
// Keep current state
[
running({
...context,
...(status && { status }),
...(downloadProgress && { downloadProgress }),
}),
// Only mark the task as changed if there is new data
[status, downloadProgress].some((v) => !!v),
],
finish: () => [finished(context), true],
};
};
// Once the task is finished, it cannot go back to a running state
const finished = (context: Image): ImageTask => {
return {
done: true,
context,
update: () => [finished(context), false],
finish: () => [finished(context), false],
};
};
return running(initialContext);
}
const runningTasks: { [imageId: number]: ImageTask } = {};
function reportEvent(event: 'start' | 'update' | 'finish', state: Image) {
const { imageId } = state;
// Emit by default if a start event is reported
let emitChange = event === 'start';
// Get the current task and update it in memory
const currentTask =
event === 'start' ? createTask(state) : runningTasks[imageId];
runningTasks[imageId] = currentTask;
// TODO: should we assert that the current task exists at this point?
// On update, update the corresponding task with the new state if it exists
if (event === 'update' && currentTask) {
const [updatedTask, changed] = currentTask.update(state);
runningTasks[imageId] = updatedTask;
emitChange = changed;
}
// On update, update the corresponding task with the new state if it exists
if (event === 'finish' && currentTask) {
[, emitChange] = currentTask.finish();
delete runningTasks[imageId];
}
if (emitChange) {
events.emit('change');
}
}
let appUpdatePollInterval: number;
@ -125,12 +206,7 @@ export async function triggerFetch(
}
const onProgress = (progress: FetchProgressEvent) => {
// Only report the percentage if we haven't finished fetching
if (volatileState[image.imageId] != null) {
reportChange(image.imageId, {
downloadProgress: progress.percentage,
});
}
reportEvent('update', { ...image, downloadProgress: progress.percentage });
};
let success: boolean;
@ -157,10 +233,13 @@ export async function triggerFetch(
}
throw e;
}
reportChange(
image.imageId,
_.merge(_.clone(image), { status: 'Downloading', downloadProgress: 0 }),
);
// Report a fetch start
reportEvent('start', {
...image,
status: 'Downloading',
downloadProgress: 0,
});
try {
let id;
@ -197,7 +276,7 @@ export async function triggerFetch(
}
}
reportChange(image.imageId);
reportEvent('finish', image);
onFinish(success);
}
@ -280,9 +359,9 @@ export async function getAvailable(): Promise<Image[]> {
}
export function getDownloadingImageIds(): number[] {
return _.keys(_.pickBy(volatileState, { status: 'Downloading' })).map((i) =>
validation.checkInt(i),
) as number[];
return Object.values(runningTasks)
.filter((t) => t.context.status === 'Downloading')
.map((t) => t.context.imageId);
}
export async function cleanImageData(): Promise<void> {
@ -331,18 +410,22 @@ export async function cleanImageData(): Promise<void> {
}
export const getStatus = async () => {
const images = await getAvailable();
for (const image of images) {
image.status = 'Downloaded';
image.downloadProgress = null;
}
const status = _.clone(volatileState);
for (const image of images) {
if (status[image.imageId] == null) {
status[image.imageId] = image;
}
}
return _.values(status);
const images = (await getAvailable()).map((img) => ({
...img,
status: 'Downloaded' as Image['status'],
downloadImageSuccess: null,
}));
const imagesFromRunningTasks = Object.values(runningTasks).map(
(task) => task.context,
);
const runningImageIds = imagesFromRunningTasks.map((img) => img.imageId);
// TODO: this is possibly wrong, the value from getAvailable should be more reliable
// than the value from running tasks
return imagesFromRunningTasks.concat(
images.filter((img) => !runningImageIds.includes(img.imageId)),
);
};
export async function update(image: Image): Promise<void> {
@ -593,10 +676,7 @@ async function removeImageIfNotNeeded(image: Image): Promise<void> {
[] as string[],
);
reportChange(
image.imageId,
_.merge(_.clone(image), { status: 'Deleting' }),
);
reportEvent('start', { ...image, status: 'Deleting' });
logger.logSystemEvent(LogTypes.deleteImage, { image });
// The engine doesn't handle concurrency too well. If two requests to
@ -641,7 +721,7 @@ async function removeImageIfNotNeeded(image: Image): Promise<void> {
throw e;
}
} finally {
reportChange(image.imageId);
reportEvent('finish', image);
}
await db.models('image').del().where({ id: img.id });
@ -706,20 +786,3 @@ function fetchImage(
logger.logSystemEvent(LogTypes.downloadImage, { image });
return dockerUtils.fetchImageWithProgress(image.name, opts, onProgress);
}
// TODO: find out if imageId can actually be null
function reportChange(imageId: Nullable<number>, status?: Partial<Image>) {
if (imageId == null) {
return;
}
if (status != null) {
if (volatileState[imageId] == null) {
volatileState[imageId] = { imageId } as Image;
}
_.merge(volatileState[imageId], status);
return events.emit('change');
} else if (volatileState[imageId] != null) {
delete volatileState[imageId];
return events.emit('change');
}
}