2020-05-16 14:08:14 +01:00
|
|
|
import { EventEmitter } from 'events';
|
|
|
|
import * as url from 'url';
|
2023-10-12 16:40:00 +01:00
|
|
|
import { setTimeout } from 'timers/promises';
|
2023-02-20 22:11:27 -08:00
|
|
|
import * as Bluebird from 'bluebird';
|
2020-05-16 14:08:14 +01:00
|
|
|
import type StrictEventEmitter from 'strict-event-emitter-types';
|
|
|
|
|
|
|
|
import type { TargetState } from '../types/state';
|
|
|
|
import { InternalInconsistencyError } from '../lib/errors';
|
2022-11-21 20:53:25 +00:00
|
|
|
import { getGotInstance } from '../lib/request';
|
2020-05-16 14:08:14 +01:00
|
|
|
import * as config from '../config';
|
|
|
|
import { writeLock } from '../lib/update-lock';
|
2023-02-20 17:38:05 -08:00
|
|
|
import * as constants from '../lib/constants';
|
2020-06-09 13:47:50 +01:00
|
|
|
import log from '../lib/supervisor-console';
|
|
|
|
|
|
|
|
export class ApiResponseError extends Error {}
|
2020-05-16 14:08:14 +01:00
|
|
|
|
|
|
|
interface TargetStateEvents {
|
|
|
|
'target-state-update': (
|
|
|
|
targetState: TargetState,
|
|
|
|
force: boolean,
|
|
|
|
isFromApi: boolean,
|
|
|
|
) => void;
|
2020-12-18 15:10:04 -05:00
|
|
|
'target-state-apply': (force: boolean, isFromApi: boolean) => void;
|
2020-05-16 14:08:14 +01:00
|
|
|
}
|
2022-09-19 16:33:52 +01:00
|
|
|
export const emitter: StrictEventEmitter<EventEmitter, TargetStateEvents> =
|
|
|
|
new EventEmitter();
|
2020-05-16 14:08:14 +01:00
|
|
|
|
|
|
|
const lockGetTarget = () =>
|
|
|
|
writeLock('getTarget').disposer((release) => release());
|
|
|
|
|
2020-11-11 11:46:55 -03:00
|
|
|
type CachedResponse = {
|
2020-05-16 14:08:14 +01:00
|
|
|
etag?: string | string[];
|
|
|
|
body: TargetState;
|
2020-11-11 11:46:55 -03:00
|
|
|
emitted?: boolean;
|
2020-05-16 14:08:14 +01:00
|
|
|
};
|
|
|
|
|
2020-11-11 11:46:55 -03:00
|
|
|
let cache: CachedResponse;
|
|
|
|
|
2020-05-16 14:08:14 +01:00
|
|
|
/**
|
|
|
|
* appUpdatePollInterval is set when startPoll successfuly queries the config
|
|
|
|
*/
|
|
|
|
let appUpdatePollInterval: number;
|
|
|
|
|
2020-11-11 11:46:55 -03:00
|
|
|
/**
|
2020-12-18 15:10:04 -05:00
|
|
|
* Emit target state event based on if the CacheResponse has/was emitted.
|
2020-11-11 11:46:55 -03:00
|
|
|
*
|
2020-12-18 15:10:04 -05:00
|
|
|
* Returns false if the CacheResponse is not emitted.
|
2020-11-11 11:46:55 -03:00
|
|
|
*
|
|
|
|
* @param cachedResponse the response to emit
|
|
|
|
* @param force Emitted with the 'target-state-update' event update as necessary
|
|
|
|
* @param isFromApi Emitted with the 'target-state-update' event update as necessary
|
|
|
|
* @return true if the response has been emitted or false otherwise
|
|
|
|
*/
|
|
|
|
const emitTargetState = (
|
|
|
|
cachedResponse: CachedResponse,
|
|
|
|
force = false,
|
|
|
|
isFromApi = false,
|
|
|
|
): boolean => {
|
|
|
|
if (
|
|
|
|
!cachedResponse.emitted &&
|
|
|
|
emitter.listenerCount('target-state-update') > 0
|
|
|
|
) {
|
2020-12-18 15:10:04 -05:00
|
|
|
// CachedResponse has not been emitted before so emit as an update
|
2020-11-11 11:46:55 -03:00
|
|
|
emitter.emit(
|
|
|
|
'target-state-update',
|
2023-09-29 12:11:33 -07:00
|
|
|
structuredClone(cachedResponse.body),
|
2020-11-11 11:46:55 -03:00
|
|
|
force,
|
|
|
|
isFromApi,
|
|
|
|
);
|
2020-12-18 15:10:04 -05:00
|
|
|
return true;
|
|
|
|
} else if (
|
|
|
|
cachedResponse.emitted &&
|
|
|
|
isFromApi &&
|
|
|
|
emitter.listenerCount('target-state-apply') > 0
|
|
|
|
) {
|
|
|
|
// CachedResponse has been emitted but a client triggered the check so emit an apply
|
|
|
|
emitter.emit('target-state-apply', force, isFromApi);
|
2020-11-11 11:46:55 -03:00
|
|
|
return true;
|
|
|
|
}
|
2020-12-18 15:10:04 -05:00
|
|
|
// Return the same emitted value but normalized to be a boolean
|
2020-11-11 11:46:55 -03:00
|
|
|
return !!cache.emitted;
|
|
|
|
};
|
|
|
|
|
2020-05-16 14:08:14 +01:00
|
|
|
/**
|
|
|
|
* The last fetch attempt
|
|
|
|
*
|
|
|
|
* We set a value rather then being undeclared because having it undefined
|
|
|
|
* adds more overhead to dealing with this value without any benefits.
|
|
|
|
*/
|
|
|
|
export let lastFetch: ReturnType<typeof process.hrtime> = process.hrtime();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Attempts to update the target state
|
|
|
|
* @param force Emitted with the 'target-state-update' event update as necessary
|
|
|
|
* @param isFromApi Emitted with the 'target-state-update' event update as necessary
|
|
|
|
*/
|
|
|
|
export const update = async (
|
|
|
|
// TODO: Is there a better way than passing these params here just to emit them if there is an update?
|
|
|
|
force = false,
|
|
|
|
isFromApi = false,
|
|
|
|
): Promise<void> => {
|
2022-09-06 14:03:23 -04:00
|
|
|
await config.initialized();
|
2020-05-16 14:08:14 +01:00
|
|
|
return Bluebird.using(lockGetTarget(), async () => {
|
2022-09-19 16:33:52 +01:00
|
|
|
const { uuid, apiEndpoint, apiTimeout, deviceApiKey } =
|
|
|
|
await config.getMany([
|
|
|
|
'uuid',
|
|
|
|
'apiEndpoint',
|
|
|
|
'apiTimeout',
|
|
|
|
'deviceApiKey',
|
|
|
|
]);
|
2020-05-16 14:08:14 +01:00
|
|
|
|
|
|
|
if (typeof apiEndpoint !== 'string') {
|
|
|
|
throw new InternalInconsistencyError(
|
|
|
|
'Non-string apiEndpoint passed to ApiBinder.getTargetState',
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-08-03 23:12:47 +00:00
|
|
|
const endpoint = url.resolve(apiEndpoint, `/device/v3/${uuid}/state`);
|
2022-11-21 20:53:25 +00:00
|
|
|
const got = await getGotInstance();
|
2020-05-16 14:08:14 +01:00
|
|
|
|
2022-11-21 20:53:25 +00:00
|
|
|
const { statusCode, headers, body } = await got(endpoint, {
|
2020-06-09 13:47:50 +01:00
|
|
|
headers: {
|
|
|
|
Authorization: `Bearer ${deviceApiKey}`,
|
|
|
|
'If-None-Match': cache?.etag,
|
|
|
|
},
|
2022-11-21 20:53:25 +00:00
|
|
|
timeout: {
|
|
|
|
// TODO: We use the same default timeout for all of these in order to have a timeout generally
|
|
|
|
// but it would probably make sense to tune them individually
|
|
|
|
lookup: apiTimeout,
|
|
|
|
connect: apiTimeout,
|
|
|
|
secureConnect: apiTimeout,
|
|
|
|
socket: apiTimeout,
|
|
|
|
send: apiTimeout,
|
|
|
|
response: apiTimeout,
|
|
|
|
},
|
|
|
|
});
|
2020-05-16 14:08:14 +01:00
|
|
|
|
2022-01-25 17:13:29 -03:00
|
|
|
if (statusCode === 304 && cache?.etag != null) {
|
2020-11-11 11:46:55 -03:00
|
|
|
// There's no change so no need to update the cache
|
|
|
|
// only emit the target state if it hasn't been emitted yet
|
|
|
|
cache.emitted = emitTargetState(cache, force, isFromApi);
|
2020-05-16 14:08:14 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-06-09 13:47:50 +01:00
|
|
|
if (statusCode < 200 || statusCode >= 300) {
|
|
|
|
log.error(`Error from the API: ${statusCode}`);
|
|
|
|
throw new ApiResponseError(`Error from the API: ${statusCode}`);
|
|
|
|
}
|
|
|
|
|
2020-05-16 14:08:14 +01:00
|
|
|
cache = {
|
|
|
|
etag: headers.etag,
|
2022-11-21 20:53:25 +00:00
|
|
|
body: body as any,
|
2020-05-16 14:08:14 +01:00
|
|
|
};
|
|
|
|
|
2020-11-11 11:46:55 -03:00
|
|
|
// Emit the target state and update the cache
|
|
|
|
cache.emitted = emitTargetState(cache, force, isFromApi);
|
2020-05-16 14:08:14 +01:00
|
|
|
}).finally(() => {
|
|
|
|
lastFetch = process.hrtime();
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
const poll = async (
|
|
|
|
skipFirstGet: boolean = false,
|
|
|
|
fetchErrors: number = 0,
|
|
|
|
): Promise<void> => {
|
|
|
|
// Add random jitter up to `maxApiJitterDelay` to space out poll requests
|
2023-04-07 14:22:35 -04:00
|
|
|
// NOTE: the jitter has as objective to reduce the load on networks that have
|
|
|
|
// devices on the 1000s. It's not meant to ease the load on the API as the backend performs
|
|
|
|
// other operations to deal with scaling issues
|
2020-05-16 14:08:14 +01:00
|
|
|
let pollInterval =
|
|
|
|
Math.random() * constants.maxApiJitterDelay + appUpdatePollInterval;
|
|
|
|
|
|
|
|
// Convenience function used for delaying poll loops
|
|
|
|
const delayedLoop = async (delayBy: number) => {
|
|
|
|
// Wait until we want to poll again
|
2023-10-12 16:40:00 +01:00
|
|
|
await setTimeout(delayBy);
|
2020-05-16 14:08:14 +01:00
|
|
|
// Poll again
|
|
|
|
await poll(false, fetchErrors);
|
|
|
|
};
|
|
|
|
|
|
|
|
// Check if we want to skip first request and just loop again
|
|
|
|
if (skipFirstGet) {
|
|
|
|
return delayedLoop(pollInterval);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try to fetch latest target state
|
|
|
|
try {
|
|
|
|
await update();
|
2020-06-08 15:41:11 +01:00
|
|
|
// Reset fetchErrors because we successfuly updated
|
|
|
|
fetchErrors = 0;
|
2022-09-19 16:08:16 +01:00
|
|
|
} catch {
|
2020-05-16 14:08:14 +01:00
|
|
|
// Exponential back off if request fails
|
|
|
|
pollInterval = Math.min(appUpdatePollInterval, 15000 * 2 ** fetchErrors);
|
|
|
|
++fetchErrors;
|
|
|
|
} finally {
|
|
|
|
// Wait to poll again
|
|
|
|
await delayedLoop(pollInterval);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
2020-07-30 10:31:53 +01:00
|
|
|
* Checks for target state changes and then returns the latest target state
|
2020-05-16 14:08:14 +01:00
|
|
|
*/
|
|
|
|
export const get = async (): Promise<TargetState> => {
|
2020-07-30 10:31:53 +01:00
|
|
|
await update();
|
2023-09-29 12:11:33 -07:00
|
|
|
return structuredClone(cache.body);
|
2020-05-16 14:08:14 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Start polling for target state updates
|
|
|
|
*
|
|
|
|
* startPoll will try to query the config for all values needed
|
|
|
|
* to being polling. If there is an issue obtaining these values
|
|
|
|
* the function will wait 10 seconds and retry until successful.
|
|
|
|
*/
|
|
|
|
export const startPoll = async (): Promise<void> => {
|
|
|
|
let instantUpdates;
|
|
|
|
try {
|
2022-09-06 14:03:23 -04:00
|
|
|
await config.initialized();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Listen for config changes to appUpdatePollInterval
|
|
|
|
*/
|
|
|
|
config.on('change', (changedConfig) => {
|
|
|
|
if (changedConfig.appUpdatePollInterval) {
|
|
|
|
appUpdatePollInterval = changedConfig.appUpdatePollInterval;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2020-05-16 14:08:14 +01:00
|
|
|
// Query and set config values we need to avoid multiple db hits
|
2022-09-19 16:33:52 +01:00
|
|
|
const { instantUpdates: updates, appUpdatePollInterval: interval } =
|
|
|
|
await config.getMany(['instantUpdates', 'appUpdatePollInterval']);
|
2020-05-16 14:08:14 +01:00
|
|
|
instantUpdates = updates;
|
|
|
|
appUpdatePollInterval = interval;
|
|
|
|
} catch {
|
|
|
|
// Delay 10 seconds and retry loading config
|
2023-10-12 16:40:00 +01:00
|
|
|
await setTimeout(10000);
|
2020-05-16 14:08:14 +01:00
|
|
|
// Attempt to start poll again
|
|
|
|
return startPoll();
|
|
|
|
}
|
|
|
|
// All config values fetch, start polling!
|
|
|
|
return poll(!instantUpdates);
|
|
|
|
};
|