mirror of
https://github.com/balena-os/balena-supervisor.git
synced 2025-03-12 07:23:58 +00:00
Ensure the first target state request is applied
During first time run of the supervisor, the target state is queried by `reportInitialEnv`. Since this happens early on the initialization process, this target state report is missed by any listeners and this can lead to the initial target state not beeing applied (see #1455). This PR ensures that target state is re-emitted if there were no listeners setup on call to update. Change-type: patch Signed-off-by: Felipe Lalanne <felipe@balena.io> Connects-to: #1455
This commit is contained in:
parent
50751b7716
commit
e4e895630f
@ -31,11 +31,14 @@ export const emitter: StrictEventEmitter<
|
|||||||
const lockGetTarget = () =>
|
const lockGetTarget = () =>
|
||||||
writeLock('getTarget').disposer((release) => release());
|
writeLock('getTarget').disposer((release) => release());
|
||||||
|
|
||||||
let cache: {
|
type CachedResponse = {
|
||||||
etag?: string | string[];
|
etag?: string | string[];
|
||||||
body: TargetState;
|
body: TargetState;
|
||||||
|
emitted?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let cache: CachedResponse;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* appUpdatePollInterval is set when startPoll successfuly queries the config
|
* appUpdatePollInterval is set when startPoll successfuly queries the config
|
||||||
*/
|
*/
|
||||||
@ -53,6 +56,39 @@ let appUpdatePollInterval: number;
|
|||||||
});
|
});
|
||||||
})();
|
})();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emit target state from a cached response if there are any listeners available.
|
||||||
|
*
|
||||||
|
* If no listeners are available and the cached response has not been emitted it
|
||||||
|
* returns false.
|
||||||
|
*
|
||||||
|
* @param cachedResponse the response to emit
|
||||||
|
* @param force Emitted with the 'target-state-update' event update as necessary
|
||||||
|
* @param isFromApi Emitted with the 'target-state-update' event update as necessary
|
||||||
|
* @return true if the response has been emitted or false otherwise
|
||||||
|
*/
|
||||||
|
const emitTargetState = (
|
||||||
|
cachedResponse: CachedResponse,
|
||||||
|
force = false,
|
||||||
|
isFromApi = false,
|
||||||
|
): boolean => {
|
||||||
|
if (
|
||||||
|
!cachedResponse.emitted &&
|
||||||
|
emitter.listenerCount('target-state-update') > 0
|
||||||
|
) {
|
||||||
|
emitter.emit(
|
||||||
|
'target-state-update',
|
||||||
|
_.cloneDeep(cachedResponse.body),
|
||||||
|
force,
|
||||||
|
isFromApi,
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return !!cache.emitted;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The last fetch attempt
|
* The last fetch attempt
|
||||||
*
|
*
|
||||||
@ -107,7 +143,9 @@ export const update = async (
|
|||||||
.timeout(apiTimeout);
|
.timeout(apiTimeout);
|
||||||
|
|
||||||
if (statusCode === 304) {
|
if (statusCode === 304) {
|
||||||
// There's no change so no need to update the cache or emit a change event
|
// There's no change so no need to update the cache
|
||||||
|
// only emit the target state if it hasn't been emitted yet
|
||||||
|
cache.emitted = emitTargetState(cache, force, isFromApi);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +159,8 @@ export const update = async (
|
|||||||
body,
|
body,
|
||||||
};
|
};
|
||||||
|
|
||||||
emitter.emit('target-state-update', _.cloneDeep(body), force, isFromApi);
|
// Emit the target state and update the cache
|
||||||
|
cache.emitted = emitTargetState(cache, force, isFromApi);
|
||||||
}).finally(() => {
|
}).finally(() => {
|
||||||
lastFetch = process.hrtime();
|
lastFetch = process.hrtime();
|
||||||
});
|
});
|
||||||
|
235
test/40-target-state.spec.ts
Normal file
235
test/40-target-state.spec.ts
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
import { SinonStub, stub, spy, SinonSpy } from 'sinon';
|
||||||
|
import { Promise } from 'bluebird';
|
||||||
|
|
||||||
|
import * as _ from 'lodash';
|
||||||
|
|
||||||
|
import { expect } from './lib/chai-config';
|
||||||
|
import * as TargetState from '../src/device-state/target-state';
|
||||||
|
import * as request from '../src/lib/request';
|
||||||
|
|
||||||
|
const stateEndpointBody = {
|
||||||
|
local: {
|
||||||
|
name: 'solitary-bush',
|
||||||
|
config: {},
|
||||||
|
apps: {
|
||||||
|
'123': {
|
||||||
|
name: 'my-app',
|
||||||
|
services: {},
|
||||||
|
volumes: {},
|
||||||
|
networks: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
dependent: {
|
||||||
|
apps: {},
|
||||||
|
devices: {},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('Target state', () => {
|
||||||
|
describe('update', () => {
|
||||||
|
it('should emit target state when a new one is available', async () => {
|
||||||
|
const req = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 200,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
JSON.stringify(stateEndpointBody),
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
spy(req, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(req as any);
|
||||||
|
|
||||||
|
// Setup target state listener
|
||||||
|
const listener = stub();
|
||||||
|
TargetState.emitter.on('target-state-update', listener);
|
||||||
|
|
||||||
|
// Perform target state request
|
||||||
|
await TargetState.update();
|
||||||
|
|
||||||
|
expect(request.getRequestInstance).to.be.calledOnce;
|
||||||
|
|
||||||
|
// The getAsync method gets called once and includes authorization headers
|
||||||
|
expect(req.getAsync).to.be.calledOnce;
|
||||||
|
expect(
|
||||||
|
_.has((req.getAsync as SinonSpy).args[0], ['headers', 'Authorization']),
|
||||||
|
);
|
||||||
|
|
||||||
|
// The listener should receive the endpoint
|
||||||
|
expect(listener).to.be.calledOnceWith(JSON.stringify(stateEndpointBody));
|
||||||
|
|
||||||
|
// Remove getRequestInstance stub
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
|
||||||
|
// new request returns 304
|
||||||
|
const newReq = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 304,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
spy(newReq, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(newReq as any);
|
||||||
|
|
||||||
|
// Perform new target state request
|
||||||
|
await TargetState.update();
|
||||||
|
|
||||||
|
// The new req should have been called
|
||||||
|
expect(newReq.getAsync).to.be.calledOnce;
|
||||||
|
|
||||||
|
// No new calls to the listener
|
||||||
|
expect(listener).to.be.calledOnce;
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
TargetState.emitter.off('target-state-update', listener);
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should emit cached target state if there was no listener for the cached state', async () => {
|
||||||
|
const req = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 200,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
JSON.stringify(stateEndpointBody),
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
spy(req, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(req as any);
|
||||||
|
|
||||||
|
// Perform target state request
|
||||||
|
await TargetState.update();
|
||||||
|
|
||||||
|
expect(request.getRequestInstance).to.be.calledOnce;
|
||||||
|
|
||||||
|
// The getAsync method gets called once and includes authorization headers
|
||||||
|
expect(req.getAsync).to.be.calledOnce;
|
||||||
|
expect(
|
||||||
|
_.has((req.getAsync as SinonSpy).args[0], ['headers', 'Authorization']),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Remove getRequestInstance stub
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
|
||||||
|
// new request returns 304
|
||||||
|
const newReq = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 304,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
spy(newReq, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(newReq as any);
|
||||||
|
|
||||||
|
// Setup target state listener after the first request
|
||||||
|
const listener = stub();
|
||||||
|
TargetState.emitter.on('target-state-update', listener);
|
||||||
|
|
||||||
|
// Perform new target state request
|
||||||
|
await TargetState.update();
|
||||||
|
|
||||||
|
// The new req should have been called
|
||||||
|
expect(newReq.getAsync).to.be.calledOnce;
|
||||||
|
|
||||||
|
// The listener should receive the endpoint
|
||||||
|
expect(listener).to.be.calledOnceWith(JSON.stringify(stateEndpointBody));
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
TargetState.emitter.off('target-state-update', listener);
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('get', () => {
|
||||||
|
it('returns the latest target state endpoint response', async () => {
|
||||||
|
const req = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 200,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
JSON.stringify(stateEndpointBody),
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup spies and stubs
|
||||||
|
spy(req, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(req as any);
|
||||||
|
|
||||||
|
// Perform target state request
|
||||||
|
const response = await TargetState.get();
|
||||||
|
|
||||||
|
// The stubbed methods should only be called once
|
||||||
|
expect(request.getRequestInstance).to.be.calledOnce;
|
||||||
|
expect(req.getAsync).to.be.calledOnce;
|
||||||
|
|
||||||
|
// Cached value should reflect latest response
|
||||||
|
expect(response).to.be.equal(JSON.stringify(stateEndpointBody));
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns the last cached target state', async () => {
|
||||||
|
const req = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 200,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
JSON.stringify(stateEndpointBody),
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup spies and stubs
|
||||||
|
stub(request, 'getRequestInstance').resolves(req as any);
|
||||||
|
|
||||||
|
// Perform target state request, this should
|
||||||
|
// put the query result in the cache
|
||||||
|
await TargetState.update();
|
||||||
|
|
||||||
|
// Reset the stub
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
const newReq = {
|
||||||
|
getAsync: () =>
|
||||||
|
Promise.resolve([
|
||||||
|
{
|
||||||
|
statusCode: 304,
|
||||||
|
headers: {},
|
||||||
|
} as any,
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
spy(newReq, 'getAsync');
|
||||||
|
stub(request, 'getRequestInstance').resolves(newReq as any);
|
||||||
|
|
||||||
|
// Perform target state request
|
||||||
|
const response = await TargetState.get();
|
||||||
|
|
||||||
|
// The stubbed methods should only be called once
|
||||||
|
expect(request.getRequestInstance).to.be.calledOnce;
|
||||||
|
expect(newReq.getAsync).to.be.calledOnce;
|
||||||
|
|
||||||
|
// Cached value should reflect latest response
|
||||||
|
expect(response).to.be.equal(JSON.stringify(stateEndpointBody));
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
(request.getRequestInstance as SinonStub).restore();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
x
Reference in New Issue
Block a user