diff --git a/src/plugins/persistence/couch/CouchObjectProvider.js b/src/plugins/persistence/couch/CouchObjectProvider.js index b73d18fb3f..05e022637e 100644 --- a/src/plugins/persistence/couch/CouchObjectProvider.js +++ b/src/plugins/persistence/couch/CouchObjectProvider.js @@ -199,6 +199,11 @@ class CouchObjectProvider { } let response = null; + + if (!this.isObservingObjectChanges()) { + this.#observeObjectChanges(); + } + try { response = await fetch(this.url + '/' + subPath, fetchOptions); const { status } = response; @@ -471,9 +476,6 @@ class CouchObjectProvider { this.observers[keyString] = this.observers[keyString].filter(observer => observer !== callback); if (this.observers[keyString].length === 0) { delete this.observers[keyString]; - if (Object.keys(this.observers).length === 0 && this.isObservingObjectChanges()) { - this.stopObservingObjectChanges(); - } } } }; @@ -498,7 +500,6 @@ class CouchObjectProvider { } else { this.#initiateSharedWorkerFetchChanges(sseURL.toString()); } - } /** @@ -525,18 +526,24 @@ class CouchObjectProvider { onEventError(error) { console.error('Error on feed', error); - if (Object.keys(this.observers).length > 0) { - this.#observeObjectChanges(); - } + const { readyState } = error.target; + this.#updateIndicatorStatus(readyState); + } + + onEventOpen(event) { + const { readyState } = event.target; + this.#updateIndicatorStatus(readyState); } onEventMessage(event) { + const { readyState } = event.target; const eventData = JSON.parse(event.data); const identifier = { namespace: this.namespace, key: eventData.id }; const keyString = this.openmct.objects.makeKeyString(identifier); + this.#updateIndicatorStatus(readyState); let observersForObject = this.observers[keyString]; if (observersForObject) { @@ -559,17 +566,18 @@ class CouchObjectProvider { this.stopObservingObjectChanges = () => { controller.abort(); - couchEventSource.removeEventListener('message', this.onEventMessage); + couchEventSource.removeEventListener('message', this.onEventMessage.bind(this)); delete this.stopObservingObjectChanges; }; console.debug('⇿ Opening CouchDB change feed connection ⇿'); couchEventSource = new EventSource(url); - couchEventSource.onerror = this.onEventError; + couchEventSource.onerror = this.onEventError.bind(this); + couchEventSource.onopen = this.onEventOpen.bind(this); // start listening for events - couchEventSource.addEventListener('message', this.onEventMessage); + couchEventSource.addEventListener('message', this.onEventMessage.bind(this)); console.debug('⇿ Opened connection ⇿'); } @@ -587,6 +595,31 @@ class CouchObjectProvider { return intermediateResponse; } + /** + * Update the indicator status based on the readyState of the EventSource + * @private + */ + #updateIndicatorStatus(readyState) { + let message; + switch (readyState) { + case EventSource.CONNECTING: + message = 'pending'; + break; + case EventSource.OPEN: + message = 'open'; + break; + case EventSource.CLOSED: + message = 'close'; + break; + default: + message = 'unknown'; + break; + } + + const indicatorState = this.#messageToIndicatorState(message); + this.indicator.setIndicatorToState(indicatorState); + } + /** * @private */ diff --git a/src/plugins/persistence/couch/pluginSpec.js b/src/plugins/persistence/couch/pluginSpec.js index 751f56e424..7810f6f3e0 100644 --- a/src/plugins/persistence/couch/pluginSpec.js +++ b/src/plugins/persistence/couch/pluginSpec.js @@ -152,7 +152,10 @@ describe('the plugin', () => { mockDomainObject.id = mockDomainObject.identifier.key; const fakeUpdateEvent = { - data: JSON.stringify(mockDomainObject) + data: JSON.stringify(mockDomainObject), + target: { + readyState: EventSource.CONNECTED + } }; // eslint-disable-next-line require-await @@ -170,7 +173,6 @@ describe('the plugin', () => { expect(provider.create).toHaveBeenCalled(); expect(provider.observe).toHaveBeenCalled(); expect(provider.isObservingObjectChanges).toHaveBeenCalled(); - expect(provider.isObservingObjectChanges.calls.mostRecent().returnValue).toBe(true); //Set modified timestamp it detects a change and persists the updated model. mockDomainObject.modified = mockDomainObject.persisted + 1; @@ -181,6 +183,7 @@ describe('the plugin', () => { expect(updatedResult).toBeTrue(); expect(provider.update).toHaveBeenCalled(); expect(provider.fetchChanges).toHaveBeenCalled(); + expect(provider.isObservingObjectChanges.calls.mostRecent().returnValue).toBe(true); sharedWorkerCallback(fakeUpdateEvent); expect(provider.onEventMessage).toHaveBeenCalled();