mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-02-22 18:22:41 +00:00
Split target state set/get into separate module
This removes the circular dependency between device state index and preloading and cleans up the device state module code Change-type: patch
This commit is contained in:
parent
fedeb518e5
commit
45d1f1f964
@ -3,34 +3,28 @@ import { stripIndent } from 'common-tags';
|
|||||||
import { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
import type StrictEventEmitter from 'strict-event-emitter-types';
|
import type StrictEventEmitter from 'strict-event-emitter-types';
|
||||||
import { isRight } from 'fp-ts/lib/Either';
|
|
||||||
import Reporter from 'io-ts-reporters';
|
|
||||||
import prettyMs from 'pretty-ms';
|
import prettyMs from 'pretty-ms';
|
||||||
|
|
||||||
import * as config from '../config';
|
import * as config from '../config';
|
||||||
import * as db from '../db';
|
|
||||||
import * as logger from '../logger';
|
import * as logger from '../logger';
|
||||||
|
|
||||||
import * as globalEventBus from '../event-bus';
|
|
||||||
import * as network from '../network';
|
import * as network from '../network';
|
||||||
import * as deviceConfig from '../device-config';
|
import * as deviceConfig from '../device-config';
|
||||||
|
|
||||||
import * as constants from '../lib/constants';
|
import * as constants from '../lib/constants';
|
||||||
import * as dbus from '../lib/dbus';
|
import * as dbus from '../lib/dbus';
|
||||||
import {
|
import { takeGlobalLockRW } from '../lib/process-lock';
|
||||||
InternalInconsistencyError,
|
import { InternalInconsistencyError, UpdatesLockedError } from '../lib/errors';
|
||||||
TargetStateError,
|
|
||||||
UpdatesLockedError,
|
|
||||||
} from '../lib/errors';
|
|
||||||
import * as updateLock from '../lib/update-lock';
|
import * as updateLock from '../lib/update-lock';
|
||||||
import { takeGlobalLockRO, takeGlobalLockRW } from '../lib/process-lock';
|
|
||||||
import * as dbFormat from './db-format';
|
|
||||||
import { getGlobalApiKey } from '../lib/api-keys';
|
import { getGlobalApiKey } from '../lib/api-keys';
|
||||||
import * as sysInfo from '../lib/system-info';
|
import * as sysInfo from '../lib/system-info';
|
||||||
import { log } from '../lib/supervisor-console';
|
import { log } from '../lib/supervisor-console';
|
||||||
import { loadTargetFromFile } from './preload';
|
import { loadTargetFromFile } from './preload';
|
||||||
import * as applicationManager from '../compose/application-manager';
|
import * as applicationManager from '../compose/application-manager';
|
||||||
import * as commitStore from '../compose/commit';
|
import * as commitStore from '../compose/commit';
|
||||||
|
import type { InstancedDeviceState } from './target-state';
|
||||||
|
import * as TargetState from './target-state';
|
||||||
|
export { getTarget, setTarget } from './target-state';
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
DeviceLegacyState,
|
DeviceLegacyState,
|
||||||
@ -38,38 +32,11 @@ import type {
|
|||||||
DeviceReport,
|
DeviceReport,
|
||||||
AppState,
|
AppState,
|
||||||
} from '../types';
|
} from '../types';
|
||||||
import { TargetState } from '../types';
|
|
||||||
import type {
|
import type {
|
||||||
CompositionStepT,
|
CompositionStepT,
|
||||||
CompositionStepAction,
|
CompositionStepAction,
|
||||||
} from '../compose/composition-steps';
|
} from '../compose/composition-steps';
|
||||||
import * as fsUtils from '../lib/fs-utils';
|
|
||||||
import { pathOnRoot } from '../lib/host-utils';
|
|
||||||
import { setTimeout } from 'timers/promises';
|
import { setTimeout } from 'timers/promises';
|
||||||
import type { InstancedAppState } from '../compose/types';
|
|
||||||
|
|
||||||
const TARGET_STATE_CONFIG_DUMP = pathOnRoot(
|
|
||||||
'/tmp/balena-supervisor/target-state-config',
|
|
||||||
);
|
|
||||||
|
|
||||||
interface InstancedDeviceState {
|
|
||||||
local: {
|
|
||||||
name: string;
|
|
||||||
config: Dictionary<string>;
|
|
||||||
apps: InstancedAppState;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseTargetState(state: unknown): TargetState {
|
|
||||||
const res = TargetState.decode(state);
|
|
||||||
|
|
||||||
if (isRight(res)) {
|
|
||||||
return res.right;
|
|
||||||
}
|
|
||||||
|
|
||||||
const errors = ['Invalid target state.'].concat(Reporter.report(res));
|
|
||||||
throw new TargetStateError(errors.join('\n'));
|
|
||||||
}
|
|
||||||
|
|
||||||
interface DeviceStateEvents {
|
interface DeviceStateEvents {
|
||||||
error: Error;
|
error: Error;
|
||||||
@ -111,11 +78,9 @@ type DeviceStateStep<T extends PossibleStepTargets> =
|
|||||||
|
|
||||||
let currentVolatile: DeviceReport = {};
|
let currentVolatile: DeviceReport = {};
|
||||||
let maxPollTime: number;
|
let maxPollTime: number;
|
||||||
let intermediateTarget: InstancedDeviceState | null = null;
|
|
||||||
let applyBlocker: Nullable<Promise<void>>;
|
let applyBlocker: Nullable<Promise<void>>;
|
||||||
let cancelDelay: null | (() => void) = null;
|
let cancelDelay: null | (() => void) = null;
|
||||||
|
|
||||||
let failedUpdates: number = 0;
|
|
||||||
let applyCancelled = false;
|
let applyCancelled = false;
|
||||||
let lastApplyStart = process.hrtime();
|
let lastApplyStart = process.hrtime();
|
||||||
let scheduledApply: { force?: boolean; delay?: number } | null = null;
|
let scheduledApply: { force?: boolean; delay?: number } | null = null;
|
||||||
@ -301,96 +266,14 @@ function emitAsync<T extends keyof DeviceStateEvents>(
|
|||||||
return setImmediate(() => events.emit(ev as any, ...args));
|
return setImmediate(() => events.emit(ev as any, ...args));
|
||||||
}
|
}
|
||||||
|
|
||||||
const readLockTarget = () =>
|
|
||||||
takeGlobalLockRO('target').disposer((release) => release());
|
|
||||||
const writeLockTarget = () =>
|
|
||||||
takeGlobalLockRW('target').disposer((release) => release());
|
|
||||||
const inferStepsLock = () =>
|
const inferStepsLock = () =>
|
||||||
takeGlobalLockRW('inferSteps').disposer((release) => release());
|
takeGlobalLockRW('inferSteps').disposer((release) => release());
|
||||||
function usingReadLockTarget<T extends () => any, U extends ReturnType<T>>(
|
|
||||||
fn: T,
|
|
||||||
): Bluebird<UnwrappedPromise<U>> {
|
|
||||||
return Bluebird.using(readLockTarget, () => fn());
|
|
||||||
}
|
|
||||||
function usingWriteLockTarget<T extends () => any, U extends ReturnType<T>>(
|
|
||||||
fn: T,
|
|
||||||
): Bluebird<UnwrappedPromise<U>> {
|
|
||||||
return Bluebird.using(writeLockTarget, () => fn());
|
|
||||||
}
|
|
||||||
function usingInferStepsLock<T extends () => any, U extends ReturnType<T>>(
|
function usingInferStepsLock<T extends () => any, U extends ReturnType<T>>(
|
||||||
fn: T,
|
fn: T,
|
||||||
): Bluebird<UnwrappedPromise<U>> {
|
): Bluebird<UnwrappedPromise<U>> {
|
||||||
return Bluebird.using(inferStepsLock, () => fn());
|
return Bluebird.using(inferStepsLock, () => fn());
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function setTarget(target: TargetState, localSource?: boolean) {
|
|
||||||
await db.initialized();
|
|
||||||
await config.initialized();
|
|
||||||
|
|
||||||
// When we get a new target state, clear any built up apply errors
|
|
||||||
// This means that we can attempt to apply the new state instantly
|
|
||||||
if (localSource == null) {
|
|
||||||
localSource = false;
|
|
||||||
}
|
|
||||||
failedUpdates = 0;
|
|
||||||
|
|
||||||
// This will throw if target state is invalid
|
|
||||||
target = parseTargetState(target);
|
|
||||||
|
|
||||||
globalEventBus.getInstance().emit('targetStateChanged', target);
|
|
||||||
|
|
||||||
const { uuid, apiEndpoint } = await config.getMany(['uuid', 'apiEndpoint']);
|
|
||||||
|
|
||||||
if (!uuid || !target[uuid]) {
|
|
||||||
throw new Error(
|
|
||||||
`Expected target state for local device with uuid '${uuid}'.`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const localTarget = target[uuid];
|
|
||||||
|
|
||||||
await fsUtils.writeAndSyncFile(
|
|
||||||
TARGET_STATE_CONFIG_DUMP,
|
|
||||||
JSON.stringify(localTarget.config),
|
|
||||||
);
|
|
||||||
|
|
||||||
await usingWriteLockTarget(async () => {
|
|
||||||
await db.transaction(async (trx) => {
|
|
||||||
await config.set({ name: localTarget.name }, trx);
|
|
||||||
await deviceConfig.setTarget(localTarget.config, trx);
|
|
||||||
|
|
||||||
if (localSource || apiEndpoint == null || apiEndpoint === '') {
|
|
||||||
await applicationManager.setTarget(localTarget.apps, 'local', trx);
|
|
||||||
} else {
|
|
||||||
await applicationManager.setTarget(localTarget.apps, apiEndpoint, trx);
|
|
||||||
}
|
|
||||||
await config.set({ targetStateSet: true }, trx);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
export function getTarget({
|
|
||||||
initial = false,
|
|
||||||
intermediate = false,
|
|
||||||
}: {
|
|
||||||
initial?: boolean;
|
|
||||||
intermediate?: boolean;
|
|
||||||
} = {}): Bluebird<InstancedDeviceState> {
|
|
||||||
return usingReadLockTarget(async () => {
|
|
||||||
if (intermediate) {
|
|
||||||
return intermediateTarget!;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
local: {
|
|
||||||
name: await config.get('name'),
|
|
||||||
config: await deviceConfig.getTarget({ initial }),
|
|
||||||
apps: await dbFormat.getApps(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// This returns the current state of the device in (more or less)
|
// This returns the current state of the device in (more or less)
|
||||||
// the same format as the target state. This method,
|
// the same format as the target state. This method,
|
||||||
// getCurrent and getCurrentForComparison should probably get
|
// getCurrent and getCurrentForComparison should probably get
|
||||||
@ -649,7 +532,7 @@ function applyError(
|
|||||||
if (intermediate) {
|
if (intermediate) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
failedUpdates += 1;
|
TargetState.increaseFailedUpdates();
|
||||||
reportCurrentState({ update_failed: true });
|
reportCurrentState({ update_failed: true });
|
||||||
if (scheduledApply != null) {
|
if (scheduledApply != null) {
|
||||||
if (!(err instanceof UpdatesLockedError)) {
|
if (!(err instanceof UpdatesLockedError)) {
|
||||||
@ -660,7 +543,7 @@ function applyError(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const delay = Math.min(
|
const delay = Math.min(
|
||||||
Math.pow(2, failedUpdates) * constants.backoffIncrement,
|
Math.pow(2, TargetState.getFailedUpdates()) * constants.backoffIncrement,
|
||||||
maxPollTime,
|
maxPollTime,
|
||||||
);
|
);
|
||||||
// If there was an error then schedule another attempt briefly in the future.
|
// If there was an error then schedule another attempt briefly in the future.
|
||||||
@ -697,7 +580,7 @@ export const applyTarget = async ({
|
|||||||
return usingInferStepsLock(async () => {
|
return usingInferStepsLock(async () => {
|
||||||
const [currentState, targetState] = await Promise.all([
|
const [currentState, targetState] = await Promise.all([
|
||||||
getCurrentState(),
|
getCurrentState(),
|
||||||
getTarget({ initial, intermediate }),
|
TargetState.getTarget({ initial, intermediate }),
|
||||||
]);
|
]);
|
||||||
const deviceConfigSteps = await deviceConfig.getRequiredSteps(
|
const deviceConfigSteps = await deviceConfig.getRequiredSteps(
|
||||||
currentState,
|
currentState,
|
||||||
@ -744,7 +627,7 @@ export const applyTarget = async ({
|
|||||||
if (!intermediate) {
|
if (!intermediate) {
|
||||||
log.debug('Finished applying target state');
|
log.debug('Finished applying target state');
|
||||||
applicationManager.resetTimeSpentFetching();
|
applicationManager.resetTimeSpentFetching();
|
||||||
failedUpdates = 0;
|
TargetState.resetFailedUpdates();
|
||||||
lastSuccessfulUpdate = Date.now();
|
lastSuccessfulUpdate = Date.now();
|
||||||
reportCurrentState({
|
reportCurrentState({
|
||||||
update_failed: false,
|
update_failed: false,
|
||||||
@ -885,14 +768,14 @@ export async function applyIntermediateTarget(
|
|||||||
) {
|
) {
|
||||||
return pausingApply(async () => {
|
return pausingApply(async () => {
|
||||||
// TODO: Make sure we don't accidentally overwrite this
|
// TODO: Make sure we don't accidentally overwrite this
|
||||||
intermediateTarget = intermediate;
|
TargetState.setIntermediateTarget(intermediate);
|
||||||
applyInProgress = true;
|
applyInProgress = true;
|
||||||
return applyTarget({
|
return applyTarget({
|
||||||
intermediate: true,
|
intermediate: true,
|
||||||
force,
|
force,
|
||||||
keepVolumes,
|
keepVolumes,
|
||||||
}).then(() => {
|
}).then(() => {
|
||||||
intermediateTarget = null;
|
TargetState.setIntermediateTarget(null);
|
||||||
applyInProgress = false;
|
applyInProgress = false;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -4,7 +4,7 @@ import { promises as fs } from 'fs';
|
|||||||
import type { Image } from '../compose/images';
|
import type { Image } from '../compose/images';
|
||||||
import { imageFromService } from '../compose/images';
|
import { imageFromService } from '../compose/images';
|
||||||
import { NumericIdentifier } from '../types';
|
import { NumericIdentifier } from '../types';
|
||||||
import * as deviceState from '../device-state';
|
import { setTarget } from './target-state';
|
||||||
import * as config from '../config';
|
import * as config from '../config';
|
||||||
import * as deviceConfig from '../device-config';
|
import * as deviceConfig from '../device-config';
|
||||||
import * as eventTracker from '../event-tracker';
|
import * as eventTracker from '../event-tracker';
|
||||||
@ -136,7 +136,7 @@ export async function loadTargetFromFile(appsPath: string): Promise<boolean> {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
await deviceState.setTarget(localState);
|
await setTarget(localState);
|
||||||
await migrateAppsJson(appsPath);
|
await migrateAppsJson(appsPath);
|
||||||
log.success('Preloading complete');
|
log.success('Preloading complete');
|
||||||
if (preloadState.pinDevice) {
|
if (preloadState.pinDevice) {
|
||||||
|
145
src/device-state/target-state.ts
Normal file
145
src/device-state/target-state.ts
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
import Bluebird from 'bluebird';
|
||||||
|
import { isRight } from 'fp-ts/lib/Either';
|
||||||
|
import Reporter from 'io-ts-reporters';
|
||||||
|
|
||||||
|
import * as config from '../config';
|
||||||
|
import * as db from '../db';
|
||||||
|
|
||||||
|
import * as globalEventBus from '../event-bus';
|
||||||
|
import * as deviceConfig from '../device-config';
|
||||||
|
|
||||||
|
import { TargetStateError } from '../lib/errors';
|
||||||
|
import { takeGlobalLockRO, takeGlobalLockRW } from '../lib/process-lock';
|
||||||
|
import * as dbFormat from './db-format';
|
||||||
|
import * as applicationManager from '../compose/application-manager';
|
||||||
|
|
||||||
|
import { TargetState } from '../types';
|
||||||
|
import * as fsUtils from '../lib/fs-utils';
|
||||||
|
import { pathOnRoot } from '../lib/host-utils';
|
||||||
|
import type { InstancedAppState } from '../compose/types';
|
||||||
|
|
||||||
|
const TARGET_STATE_CONFIG_DUMP = pathOnRoot(
|
||||||
|
'/tmp/balena-supervisor/target-state-config',
|
||||||
|
);
|
||||||
|
|
||||||
|
export interface InstancedDeviceState {
|
||||||
|
local: {
|
||||||
|
name: string;
|
||||||
|
config: Dictionary<string>;
|
||||||
|
apps: InstancedAppState;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseTargetState(state: unknown): TargetState {
|
||||||
|
const res = TargetState.decode(state);
|
||||||
|
|
||||||
|
if (isRight(res)) {
|
||||||
|
return res.right;
|
||||||
|
}
|
||||||
|
|
||||||
|
const errors = ['Invalid target state.'].concat(Reporter.report(res));
|
||||||
|
throw new TargetStateError(errors.join('\n'));
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: avoid singletons
|
||||||
|
let failedUpdates: number = 0;
|
||||||
|
let intermediateTarget: InstancedDeviceState | null = null;
|
||||||
|
|
||||||
|
const readLockTarget = () =>
|
||||||
|
takeGlobalLockRO('target').disposer((release) => release());
|
||||||
|
const writeLockTarget = () =>
|
||||||
|
takeGlobalLockRW('target').disposer((release) => release());
|
||||||
|
function usingReadLockTarget<T extends () => any, U extends ReturnType<T>>(
|
||||||
|
fn: T,
|
||||||
|
): Bluebird<UnwrappedPromise<U>> {
|
||||||
|
return Bluebird.using(readLockTarget, () => fn());
|
||||||
|
}
|
||||||
|
function usingWriteLockTarget<T extends () => any, U extends ReturnType<T>>(
|
||||||
|
fn: T,
|
||||||
|
): Bluebird<UnwrappedPromise<U>> {
|
||||||
|
return Bluebird.using(writeLockTarget, () => fn());
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resetFailedUpdates() {
|
||||||
|
failedUpdates = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function increaseFailedUpdates() {
|
||||||
|
failedUpdates += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getFailedUpdates() {
|
||||||
|
return failedUpdates;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function setIntermediateTarget(tgt: InstancedDeviceState | null) {
|
||||||
|
intermediateTarget = tgt;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function setTarget(target: TargetState, localSource?: boolean) {
|
||||||
|
await db.initialized();
|
||||||
|
await config.initialized();
|
||||||
|
|
||||||
|
// When we get a new target state, clear any built up apply errors
|
||||||
|
// This means that we can attempt to apply the new state instantly
|
||||||
|
if (localSource == null) {
|
||||||
|
localSource = false;
|
||||||
|
}
|
||||||
|
failedUpdates = 0;
|
||||||
|
|
||||||
|
// This will throw if target state is invalid
|
||||||
|
target = parseTargetState(target);
|
||||||
|
|
||||||
|
globalEventBus.getInstance().emit('targetStateChanged', target);
|
||||||
|
|
||||||
|
const { uuid, apiEndpoint } = await config.getMany(['uuid', 'apiEndpoint']);
|
||||||
|
|
||||||
|
if (!uuid || !target[uuid]) {
|
||||||
|
throw new Error(
|
||||||
|
`Expected target state for local device with uuid '${uuid}'.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const localTarget = target[uuid];
|
||||||
|
|
||||||
|
await fsUtils.writeAndSyncFile(
|
||||||
|
TARGET_STATE_CONFIG_DUMP,
|
||||||
|
JSON.stringify(localTarget.config),
|
||||||
|
);
|
||||||
|
|
||||||
|
await usingWriteLockTarget(async () => {
|
||||||
|
await db.transaction(async (trx) => {
|
||||||
|
await config.set({ name: localTarget.name }, trx);
|
||||||
|
await deviceConfig.setTarget(localTarget.config, trx);
|
||||||
|
|
||||||
|
if (localSource || apiEndpoint == null || apiEndpoint === '') {
|
||||||
|
await applicationManager.setTarget(localTarget.apps, 'local', trx);
|
||||||
|
} else {
|
||||||
|
await applicationManager.setTarget(localTarget.apps, apiEndpoint, trx);
|
||||||
|
}
|
||||||
|
await config.set({ targetStateSet: true }, trx);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getTarget({
|
||||||
|
initial = false,
|
||||||
|
intermediate = false,
|
||||||
|
}: {
|
||||||
|
initial?: boolean;
|
||||||
|
intermediate?: boolean;
|
||||||
|
} = {}): Bluebird<InstancedDeviceState> {
|
||||||
|
return usingReadLockTarget(async () => {
|
||||||
|
if (intermediate) {
|
||||||
|
return intermediateTarget!;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
local: {
|
||||||
|
name: await config.get('name'),
|
||||||
|
config: await deviceConfig.getTarget({ initial }),
|
||||||
|
apps: await dbFormat.getApps(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user