mirror of
https://github.com/nasa/openmct.git
synced 2025-03-23 04:25:27 +00:00
[CouchDB] Always subscribe to the CouchDB changes feed (#5434)
* Add unknown state, remove maintenance state * Handle all CouchDB status codes - Set unknown status if we receive an unhandled code * Include status code in error messages * SharedWorker can send unknown status * Add test for unknown status * Always subscribe to CouchDB changes feed - Always subscribe to the CouchDB changes feed, even if there are no observable objects, since we are also checking the status of CouchDB via this feed. * Update indicator status if not using SharedWorker * Start listening to changes feed on first request * fix test * adjust test to hopefully avoid race condition * lint Co-authored-by: John Hill <john.c.hill@nasa.gov> Co-authored-by: Andrew Henry <akhenry@gmail.com> Co-authored-by: Scott Bell <scott@traclabs.com>
This commit is contained in:
parent
51196530fd
commit
69153fe8f0
src/plugins/persistence/couch
@ -199,6 +199,11 @@ class CouchObjectProvider {
|
||||
}
|
||||
|
||||
let response = null;
|
||||
|
||||
if (!this.isObservingObjectChanges()) {
|
||||
this.#observeObjectChanges();
|
||||
}
|
||||
|
||||
try {
|
||||
response = await fetch(this.url + '/' + subPath, fetchOptions);
|
||||
const { status } = response;
|
||||
@ -471,9 +476,6 @@ class CouchObjectProvider {
|
||||
this.observers[keyString] = this.observers[keyString].filter(observer => observer !== callback);
|
||||
if (this.observers[keyString].length === 0) {
|
||||
delete this.observers[keyString];
|
||||
if (Object.keys(this.observers).length === 0 && this.isObservingObjectChanges()) {
|
||||
this.stopObservingObjectChanges();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -498,7 +500,6 @@ class CouchObjectProvider {
|
||||
} else {
|
||||
this.#initiateSharedWorkerFetchChanges(sseURL.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -525,18 +526,24 @@ class CouchObjectProvider {
|
||||
|
||||
onEventError(error) {
|
||||
console.error('Error on feed', error);
|
||||
if (Object.keys(this.observers).length > 0) {
|
||||
this.#observeObjectChanges();
|
||||
}
|
||||
const { readyState } = error.target;
|
||||
this.#updateIndicatorStatus(readyState);
|
||||
}
|
||||
|
||||
onEventOpen(event) {
|
||||
const { readyState } = event.target;
|
||||
this.#updateIndicatorStatus(readyState);
|
||||
}
|
||||
|
||||
onEventMessage(event) {
|
||||
const { readyState } = event.target;
|
||||
const eventData = JSON.parse(event.data);
|
||||
const identifier = {
|
||||
namespace: this.namespace,
|
||||
key: eventData.id
|
||||
};
|
||||
const keyString = this.openmct.objects.makeKeyString(identifier);
|
||||
this.#updateIndicatorStatus(readyState);
|
||||
let observersForObject = this.observers[keyString];
|
||||
|
||||
if (observersForObject) {
|
||||
@ -559,17 +566,18 @@ class CouchObjectProvider {
|
||||
|
||||
this.stopObservingObjectChanges = () => {
|
||||
controller.abort();
|
||||
couchEventSource.removeEventListener('message', this.onEventMessage);
|
||||
couchEventSource.removeEventListener('message', this.onEventMessage.bind(this));
|
||||
delete this.stopObservingObjectChanges;
|
||||
};
|
||||
|
||||
console.debug('⇿ Opening CouchDB change feed connection ⇿');
|
||||
|
||||
couchEventSource = new EventSource(url);
|
||||
couchEventSource.onerror = this.onEventError;
|
||||
couchEventSource.onerror = this.onEventError.bind(this);
|
||||
couchEventSource.onopen = this.onEventOpen.bind(this);
|
||||
|
||||
// start listening for events
|
||||
couchEventSource.addEventListener('message', this.onEventMessage);
|
||||
couchEventSource.addEventListener('message', this.onEventMessage.bind(this));
|
||||
|
||||
console.debug('⇿ Opened connection ⇿');
|
||||
}
|
||||
@ -587,6 +595,31 @@ class CouchObjectProvider {
|
||||
return intermediateResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the indicator status based on the readyState of the EventSource
|
||||
* @private
|
||||
*/
|
||||
#updateIndicatorStatus(readyState) {
|
||||
let message;
|
||||
switch (readyState) {
|
||||
case EventSource.CONNECTING:
|
||||
message = 'pending';
|
||||
break;
|
||||
case EventSource.OPEN:
|
||||
message = 'open';
|
||||
break;
|
||||
case EventSource.CLOSED:
|
||||
message = 'close';
|
||||
break;
|
||||
default:
|
||||
message = 'unknown';
|
||||
break;
|
||||
}
|
||||
|
||||
const indicatorState = this.#messageToIndicatorState(message);
|
||||
this.indicator.setIndicatorToState(indicatorState);
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
|
@ -152,7 +152,10 @@ describe('the plugin', () => {
|
||||
mockDomainObject.id = mockDomainObject.identifier.key;
|
||||
|
||||
const fakeUpdateEvent = {
|
||||
data: JSON.stringify(mockDomainObject)
|
||||
data: JSON.stringify(mockDomainObject),
|
||||
target: {
|
||||
readyState: EventSource.CONNECTED
|
||||
}
|
||||
};
|
||||
|
||||
// eslint-disable-next-line require-await
|
||||
@ -170,7 +173,6 @@ describe('the plugin', () => {
|
||||
expect(provider.create).toHaveBeenCalled();
|
||||
expect(provider.observe).toHaveBeenCalled();
|
||||
expect(provider.isObservingObjectChanges).toHaveBeenCalled();
|
||||
expect(provider.isObservingObjectChanges.calls.mostRecent().returnValue).toBe(true);
|
||||
|
||||
//Set modified timestamp it detects a change and persists the updated model.
|
||||
mockDomainObject.modified = mockDomainObject.persisted + 1;
|
||||
@ -181,6 +183,7 @@ describe('the plugin', () => {
|
||||
expect(updatedResult).toBeTrue();
|
||||
expect(provider.update).toHaveBeenCalled();
|
||||
expect(provider.fetchChanges).toHaveBeenCalled();
|
||||
expect(provider.isObservingObjectChanges.calls.mostRecent().returnValue).toBe(true);
|
||||
sharedWorkerCallback(fakeUpdateEvent);
|
||||
expect(provider.onEventMessage).toHaveBeenCalled();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user