Implement takeLock composition step + tests

This commit only implements the action that a takeLock step
results in. It does not add takeLock step generation logic
to the state funnel yet.

Signed-off-by: Christina Ying Wang <christina@balena.io>
This commit is contained in:
Christina Ying Wang 2024-02-13 21:53:05 -08:00
parent f2843e1382
commit e6df78a22b
3 changed files with 176 additions and 3 deletions

View File

@ -97,6 +97,7 @@ interface CompositionStepArgs {
noop: object;
takeLock: {
appId: number;
services: string[];
};
releaseLock: {
appId: number;
@ -283,8 +284,8 @@ export function getExecutors(app: {
noop: async () => {
/* async noop */
},
takeLock: async () => {
// TODO
takeLock: async (step) => {
await updateLock.takeLock(step.appId, step.services);
},
releaseLock: async (step) => {
await updateLock.releaseLock(step.appId);

View File

@ -8,6 +8,7 @@ import {
InternalInconsistencyError,
} from './errors';
import { pathOnRoot, pathExistsOnState } from './host-utils';
import { mkdirp } from './fs-utils';
import * as config from '../config';
import * as lockfile from './lockfile';
import { NumericIdentifier, StringIdentifier, DockerName } from '../types';
@ -76,6 +77,31 @@ async function dispose(
}
}
/**
* Composition step used by Supervisor compose module.
* Take all locks for an appId | appUuid, creating directories if they don't exist.
*/
export async function takeLock(appId: number, services: string[]) {
const release = await takeGlobalLockRW(appId);
try {
const actuallyLocked: string[] = [];
// Filter out services that already have Supervisor-taken locks.
// This needs to be done after taking the appId write lock to avoid
// race conditions with locking.
const servicesWithoutLock = services.filter(
(svc) => !getServicesLockedByAppId().isLocked(appId, svc),
);
for (const service of servicesWithoutLock) {
await mkdirp(pathOnRoot(lockPath(appId, service)));
await lockService(appId, service);
actuallyLocked.push(service);
}
return actuallyLocked;
} finally {
release();
}
}
/**
* Composition step used by Supervisor compose module.
* Release all locks for an appId | appUuid.

View File

@ -373,7 +373,153 @@ describe('lib/update-lock', () => {
};
describe('takeLock', () => {
// TODO
let testFs: TestFs.Enabled;
beforeEach(async () => {
testFs = await testfs(
{},
{ cleanup: [path.join(lockdir, '*', '*', '**.lock')] },
).enable();
// TODO: Update mocha-pod to work with creating empty directories
await mkdirp(path.join(lockdir, '1', 'server'));
await mkdirp(path.join(lockdir, '1', 'client'));
await mkdirp(path.join(lockdir, '2', 'main'));
});
afterEach(async () => {
await testFs.restore();
await fs.rm(path.join(lockdir, '1'), { recursive: true });
await fs.rm(path.join(lockdir, '2'), { recursive: true });
});
it('takes locks for a list of services for an appId', async () => {
// Take locks for appId 1
await updateLock.takeLock(1, ['server', 'client']);
// Locks should have been taken
expect(lockfile.getLocksTaken()).to.deep.include.members(
serviceLockPaths[1],
);
expect(lockfile.getLocksTaken()).to.have.length(4);
expect(
await fs.readdir(path.join(lockdir, '1', 'server')),
).to.include.members(['updates.lock', 'resin-updates.lock']);
expect(
await fs.readdir(path.join(lockdir, '1', 'client')),
).to.include.members(['updates.lock', 'resin-updates.lock']);
// Take locks for appId 2
await updateLock.takeLock(2, ['main']);
// Locks should have been taken for appid 1 & 2
expect(lockfile.getLocksTaken()).to.deep.include.members([
...serviceLockPaths[1],
...serviceLockPaths[2],
]);
expect(lockfile.getLocksTaken()).to.have.length(6);
expect(
await fs.readdir(path.join(lockdir, '2', 'main')),
).to.have.length(2);
// Clean up the lockfiles
for (const lockPath of serviceLockPaths[1].concat(
serviceLockPaths[2],
)) {
await lockfile.unlock(lockPath);
}
});
it('creates lock directory recursively if it does not exist', async () => {
// Take locks for app with nonexistent service directories
await updateLock.takeLock(3, ['api']);
// Locks should have been taken
expect(lockfile.getLocksTaken()).to.deep.include(
path.join(lockdir, '3', 'api', 'updates.lock'),
path.join(lockdir, '3', 'api', 'resin-updates.lock'),
);
// Directories should have been created
expect(await fs.readdir(path.join(lockdir))).to.deep.include.members([
'3',
]);
expect(
await fs.readdir(path.join(lockdir, '3')),
).to.deep.include.members(['api']);
// Clean up the lockfiles & created directories
await lockfile.unlock(path.join(lockdir, '3', 'api', 'updates.lock'));
await lockfile.unlock(
path.join(lockdir, '3', 'api', 'resin-updates.lock'),
);
await fs.rm(path.join(lockdir, '3'), { recursive: true });
});
it('should not take lock for services where Supervisor-taken lock already exists', async () => {
// Take locks for one service of appId 1
await lockfile.lock(serviceLockPaths[1][0]);
await lockfile.lock(serviceLockPaths[1][1]);
// Sanity check that locks are taken & tracked by Supervisor
expect(lockfile.getLocksTaken()).to.deep.include(
serviceLockPaths[1][0],
serviceLockPaths[1][1],
);
expect(lockfile.getLocksTaken()).to.have.length(2);
// Take locks using takeLock, should only lock service which doesn't
// already have locks
await expect(
updateLock.takeLock(1, ['server', 'client']),
).to.eventually.deep.include.members(['client']);
// Check that locks are taken
expect(lockfile.getLocksTaken()).to.deep.include.members(
serviceLockPaths[1],
);
// Clean up lockfiles
for (const lockPath of serviceLockPaths[1]) {
await lockfile.unlock(lockPath);
}
});
it('should error if service has a non-Supervisor-taken lock', async () => {
// Simulate a user service taking the lock for services with appId 1
for (const lockPath of serviceLockPaths[1]) {
await fs.writeFile(lockPath, '');
}
// Take locks using takeLock, should error
await expect(
updateLock.takeLock(1, ['server', 'client']),
).to.eventually.be.rejectedWith(UpdatesLockedError);
// No Supervisor locks should have been taken
expect(lockfile.getLocksTaken()).to.have.length(0);
// Clean up user-created lockfiles
for (const lockPath of serviceLockPaths[1]) {
await fs.rm(lockPath);
}
// Take locks using takeLock, should not error
await expect(
updateLock.takeLock(1, ['server', 'client']),
).to.eventually.not.be.rejectedWith(UpdatesLockedError);
// Check that locks are taken
expect(lockfile.getLocksTaken()).to.deep.include.members(
serviceLockPaths[1],
);
expect(lockfile.getLocksTaken()).to.have.length(4);
// Clean up lockfiles
for (const lockPath of serviceLockPaths[1]) {
await lockfile.unlock(lockPath);
}
});
it('waits to take locks until resource write lock is taken', async () => {
// Take the write lock for appId 1
const release = await takeGlobalLockRW(1);
// Queue takeLock, won't resolve until the write lock is released
const takeLockPromise = updateLock.takeLock(1, ['server', 'client']);
// Locks should have not been taken even after waiting
await setTimeout(500);
expect(lockfile.getLocksTaken()).to.have.length(0);
// Release the write lock
release();
// Locks should be taken
await takeLockPromise;
// Locks should have been taken
expect(lockfile.getLocksTaken()).to.deep.include.members(
serviceLockPaths[1],
);
});
});
describe('releaseLock', () => {