Add CouchDB Status Indicator ()

* Add CouchStatusIndicator

* Remove stray console log

* convert `request()` to async

* refactor

* Fix typo

* Instantiate indicator outside of object provider

- Add 'Maintenance' CouchDB status

- Add text and description for all CouchDB statuses

- Some code cleanup

* Update comments

* Add default cases to switches, make method private

* Small status text change

* Make jsdoc @private methods actually private

* Handle commonly encountered CouchDB errors

- Handle 400, 401, 404, 412, 500 status codes

- Remove `MAINTENANCE` status from this logic since that can only be assumed if receiving a 404 status from GET `{db}/_up`

* Fix tests: avoid directly calling private method

* Add some tests for indicator status

* Update docs for `CouchStatusIndicator`

* Update docs for new `CouchObjectProvider` method

* Make method private

* fix the oopsie

* Add test for 'pending' state

Co-authored-by: Scott Bell <scott@traclabs.com>
This commit is contained in:
Jesse Mazzella 2022-06-06 09:49:47 -07:00 committed by GitHub
parent 9fbb695379
commit aa0fc70e54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 351 additions and 45 deletions

@ -43,11 +43,18 @@
};
self.onerror = function (error) {
self.updateCouchStateIndicator();
console.error('🚨 Error on CouchDB feed 🚨', error);
};
self.onopen = function () {
self.updateCouchStateIndicator();
};
self.onCouchMessage = function (event) {
self.updateCouchStateIndicator();
console.debug('📩 Received message from CouchDB 📩');
const objectChanges = JSON.parse(event.data);
connections.forEach(function (connection) {
connection.postMessage({
@ -61,10 +68,39 @@
couchEventSource = new EventSource(url);
couchEventSource.onerror = self.onerror;
couchEventSource.onopen = self.onopen;
// start listening for events
couchEventSource.addEventListener('message', self.onCouchMessage);
connected = true;
console.debug('⇿ Opened connection ⇿');
};
self.updateCouchStateIndicator = function () {
const { readyState } = couchEventSource;
let message = {
type: 'state',
state: 'pending'
};
switch (readyState) {
case EventSource.CONNECTING:
message.state = 'pending';
break;
case EventSource.OPEN:
message.state = 'open';
break;
case EventSource.CLOSED:
message.state = 'close';
break;
default:
// Assume connection is closed
message.state = 'close';
console.error('🚨 Received unexpected readyState value from CouchDB EventSource feed: 🚨', readyState);
break;
}
connections.forEach(function (connection) {
connection.postMessage(message);
});
};
}());

@ -22,6 +22,7 @@
import CouchDocument from "./CouchDocument";
import CouchObjectQueue from "./CouchObjectQueue";
import { PENDING, CONNECTED, DISCONNECTED } from "./CouchStatusIndicator";
import { isNotebookType } from '../../notebook/notebook-constants.js';
const REV = "_rev";
@ -30,9 +31,10 @@ const HEARTBEAT = 50000;
const ALL_DOCS = "_all_docs?include_docs=true";
class CouchObjectProvider {
constructor(openmct, options, namespace) {
options = this._normalize(options);
constructor(openmct, options, namespace, indicator) {
options = this.#normalize(options);
this.openmct = openmct;
this.indicator = indicator;
this.url = options.url;
this.namespace = namespace;
this.objectQueue = {};
@ -45,7 +47,7 @@ class CouchObjectProvider {
/**
* @private
*/
startSharedWorker() {
#startSharedWorker() {
let provider = this;
let sharedWorker;
@ -82,6 +84,9 @@ class CouchObjectProvider {
onSharedWorkerMessage(event) {
if (event.data.type === 'connection') {
this.changesFeedSharedWorkerConnectionId = event.data.connectionId;
} else if (event.data.type === 'state') {
const state = this.#messageToIndicatorState(event.data.state);
this.indicator.setIndicatorToState(state);
} else {
let objectChanges = event.data.objectChanges;
const objectIdentifier = {
@ -103,8 +108,34 @@ class CouchObjectProvider {
}
}
/**
* Takes in a state message from the CouchDB SharedWorker and returns an IndicatorState.
* @private
* @param {'open'|'close'|'pending'} message
* @returns import('./CouchStatusIndicator').IndicatorState
*/
#messageToIndicatorState(message) {
let state;
switch (message) {
case 'open':
state = CONNECTED;
break;
case 'close':
state = DISCONNECTED;
break;
case 'pending':
state = PENDING;
break;
default:
state = PENDING;
break;
}
return state;
}
//backwards compatibility, options used to be a url. Now it's an object
_normalize(options) {
#normalize(options) {
if (typeof options === 'string') {
return {
url: options
@ -114,7 +145,7 @@ class CouchObjectProvider {
return options;
}
request(subPath, method, body, signal) {
async request(subPath, method, body, signal) {
let fetchOptions = {
method,
body,
@ -129,14 +160,32 @@ class CouchObjectProvider {
};
}
return fetch(this.url + '/' + subPath, fetchOptions)
.then((response) => {
if (response.status === CouchObjectProvider.HTTP_CONFLICT) {
throw new this.openmct.objects.errors.Conflict(`Conflict persisting ${fetchOptions.body.name}`);
}
let response = null;
try {
response = await fetch(this.url + '/' + subPath, fetchOptions);
this.indicator.setIndicatorToState(CONNECTED);
return response.json();
});
if (response.status === CouchObjectProvider.HTTP_CONFLICT) {
throw new this.openmct.objects.errors.Conflict(`Conflict persisting ${fetchOptions.body.name}`);
} else if (response.status === CouchObjectProvider.HTTP_BAD_REQUEST
|| response.status === CouchObjectProvider.HTTP_UNAUTHORIZED
|| response.status === CouchObjectProvider.HTTP_NOT_FOUND
|| response.status === CouchObjectProvider.HTTP_PRECONDITION_FAILED) {
const error = await response.json();
throw new Error(`CouchDB Error: "${error.error}: ${error.reason}"`);
} else if (response.status === CouchObjectProvider.HTTP_SERVER_ERROR) {
throw new Error('CouchDB Error: "500 Internal Server Error"');
}
return await response.json();
} catch (error) {
// Network error, CouchDB unreachable.
if (response === null) {
this.indicator.setIndicatorToState(DISCONNECTED);
}
console.error(error.message);
}
}
/**
@ -146,7 +195,7 @@ class CouchObjectProvider {
* persist any queued objects
* @private
*/
checkResponse(response, intermediateResponse, key) {
#checkResponse(response, intermediateResponse, key) {
let requestSuccess = false;
const id = response ? response.id : undefined;
let rev;
@ -166,7 +215,7 @@ class CouchObjectProvider {
this.objectQueue[id].updateRevision(rev);
this.objectQueue[id].pending = false;
if (this.objectQueue[id].hasNext()) {
this.updateQueued(id);
this.#updateQueued(id);
}
} else {
this.objectQueue[key].pending = false;
@ -176,7 +225,7 @@ class CouchObjectProvider {
/**
* @private
*/
getModel(response) {
#getModel(response) {
if (response && response.model) {
let key = response[ID];
let object = this.fromPersistedModel(response.model, key);
@ -204,7 +253,7 @@ class CouchObjectProvider {
this.batchIds.push(identifier.key);
if (this.bulkPromise === undefined) {
this.bulkPromise = this.deferBatchedGet(abortSignal);
this.bulkPromise = this.#deferBatchedGet(abortSignal);
}
return this.bulkPromise
@ -216,23 +265,23 @@ class CouchObjectProvider {
/**
* @private
*/
deferBatchedGet(abortSignal) {
#deferBatchedGet(abortSignal) {
// We until the next event loop cycle to "collect" all of the get
// requests triggered in this iteration of the event loop
return this.waitOneEventCycle().then(() => {
return this.#waitOneEventCycle().then(() => {
let batchIds = this.batchIds;
this.clearBatch();
this.#clearBatch();
if (batchIds.length === 1) {
let objectKey = batchIds[0];
//If there's only one request, just do a regular get
return this.request(objectKey, "GET", undefined, abortSignal)
.then(this.returnAsMap(objectKey));
.then(this.#returnAsMap(objectKey));
} else {
return this.bulkGet(batchIds, abortSignal);
return this.#bulkGet(batchIds, abortSignal);
}
});
}
@ -240,10 +289,10 @@ class CouchObjectProvider {
/**
* @private
*/
returnAsMap(objectKey) {
#returnAsMap(objectKey) {
return (result) => {
let objectMap = {};
objectMap[objectKey] = this.getModel(result);
objectMap[objectKey] = this.#getModel(result);
return objectMap;
};
@ -252,7 +301,7 @@ class CouchObjectProvider {
/**
* @private
*/
clearBatch() {
#clearBatch() {
this.batchIds = [];
delete this.bulkPromise;
}
@ -260,7 +309,7 @@ class CouchObjectProvider {
/**
* @private
*/
waitOneEventCycle() {
#waitOneEventCycle() {
return new Promise((resolve) => {
setTimeout(resolve);
});
@ -269,7 +318,7 @@ class CouchObjectProvider {
/**
* @private
*/
bulkGet(ids, signal) {
#bulkGet(ids, signal) {
ids = this.removeDuplicates(ids);
const query = {
@ -280,7 +329,7 @@ class CouchObjectProvider {
if (response && response.rows !== undefined) {
return response.rows.reduce((map, row) => {
if (row.doc !== undefined) {
map[row.key] = this.getModel(row.doc);
map[row.key] = this.#getModel(row.doc);
}
return map;
@ -349,7 +398,7 @@ class CouchObjectProvider {
if (json) {
let docs = json.docs;
docs.forEach(doc => {
let object = this.getModel(doc);
let object = this.#getModel(doc);
if (object) {
objects.push(object);
}
@ -368,7 +417,7 @@ class CouchObjectProvider {
this.observers[keyString].push(callback);
if (!this.isObservingObjectChanges()) {
this.observeObjectChanges();
this.#observeObjectChanges();
}
return () => {
@ -391,7 +440,7 @@ class CouchObjectProvider {
/**
* @private
*/
observeObjectChanges() {
#observeObjectChanges() {
const sseChangesPath = `${this.url}/_changes`;
const sseURL = new URL(sseChangesPath);
sseURL.searchParams.append('feed', 'eventsource');
@ -401,7 +450,7 @@ class CouchObjectProvider {
if (typeof SharedWorker === 'undefined') {
this.fetchChanges(sseURL.toString());
} else {
this.initiateSharedWorkerFetchChanges(sseURL.toString());
this.#initiateSharedWorkerFetchChanges(sseURL.toString());
}
}
@ -409,9 +458,9 @@ class CouchObjectProvider {
/**
* @private
*/
initiateSharedWorkerFetchChanges(url) {
#initiateSharedWorkerFetchChanges(url) {
if (!this.changesFeedSharedWorker) {
this.changesFeedSharedWorker = this.startSharedWorker();
this.changesFeedSharedWorker = this.#startSharedWorker();
if (this.isObservingObjectChanges()) {
this.stopObservingObjectChanges();
@ -431,7 +480,7 @@ class CouchObjectProvider {
onEventError(error) {
console.error('Error on feed', error);
if (Object.keys(this.observers).length > 0) {
this.observeObjectChanges();
this.#observeObjectChanges();
}
}
@ -475,13 +524,14 @@ class CouchObjectProvider {
// start listening for events
couchEventSource.addEventListener('message', this.onEventMessage);
console.debug('⇿ Opened connection ⇿');
}
/**
* @private
*/
getIntermediateResponse() {
#getIntermediateResponse() {
let intermediateResponse = {};
intermediateResponse.promise = new Promise(function (resolve, reject) {
intermediateResponse.resolve = resolve;
@ -509,7 +559,7 @@ class CouchObjectProvider {
}
create(model) {
let intermediateResponse = this.getIntermediateResponse();
let intermediateResponse = this.#getIntermediateResponse();
const key = model.identifier.key;
model = this.toPersistableModel(model);
this.enqueueObject(key, model, intermediateResponse);
@ -520,7 +570,7 @@ class CouchObjectProvider {
let document = new CouchDocument(key, queued.model);
this.request(key, "PUT", document).then((response) => {
console.log('create check response', key);
this.checkResponse(response, queued.intermediateResponse, key);
this.#checkResponse(response, queued.intermediateResponse, key);
}).catch(error => {
queued.intermediateResponse.reject(error);
this.objectQueue[key].pending = false;
@ -533,13 +583,13 @@ class CouchObjectProvider {
/**
* @private
*/
updateQueued(key) {
#updateQueued(key) {
if (!this.objectQueue[key].pending) {
this.objectQueue[key].pending = true;
const queued = this.objectQueue[key].dequeue();
let document = new CouchDocument(key, queued.model, this.objectQueue[key].rev);
this.request(key, "PUT", document).then((response) => {
this.checkResponse(response, queued.intermediateResponse, key);
this.#checkResponse(response, queued.intermediateResponse, key);
}).catch((error) => {
queued.intermediateResponse.reject(error);
this.objectQueue[key].pending = false;
@ -548,12 +598,12 @@ class CouchObjectProvider {
}
update(model) {
let intermediateResponse = this.getIntermediateResponse();
let intermediateResponse = this.#getIntermediateResponse();
const key = model.identifier.key;
model = this.toPersistableModel(model);
this.enqueueObject(key, model, intermediateResponse);
this.updateQueued(key);
this.#updateQueued(key);
return intermediateResponse.promise;
}
@ -577,6 +627,11 @@ class CouchObjectProvider {
}
}
CouchObjectProvider.HTTP_BAD_REQUEST = 400;
CouchObjectProvider.HTTP_UNAUTHORIZED = 401;
CouchObjectProvider.HTTP_NOT_FOUND = 404;
CouchObjectProvider.HTTP_CONFLICT = 409;
CouchObjectProvider.HTTP_PRECONDITION_FAILED = 412;
CouchObjectProvider.HTTP_SERVER_ERROR = 500;
export default CouchObjectProvider;

@ -0,0 +1,88 @@
/*****************************************************************************
* Open MCT, Copyright (c) 2014-2022, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
/**
* @typedef {Object} IndicatorState
* An object defining the visible state of the indicator.
* @property {string} statusClass - The class to apply to the indicator.
* @property {string} text - The text to display in the indicator.
* @property {string} description - The description to display in the indicator.
*/
/**
* Set of CouchDB connection states; changes among these states will be
* reflected in the indicator's appearance.
* CONNECTED: Everything nominal, expect to be able to read/write.
* DISCONNECTED: HTTP request failed (network error). Unable to reach server at all.
* PENDING: Still trying to connect, and haven't failed yet.
* MAINTENANCE: CouchDB is connected but not accepting requests.
*/
/** @type {IndicatorState} */
export const CONNECTED = {
statusClass: "s-status-on",
text: "CouchDB is connected",
description: "CouchDB is online and accepting requests."
};
/** @type {IndicatorState} */
export const PENDING = {
statusClass: "s-status-warning-lo",
text: "Attempting to connect to CouchDB...",
description: "Checking status of CouchDB, please stand by..."
};
/** @type {IndicatorState} */
export const DISCONNECTED = {
statusClass: "s-status-warning-hi",
text: "CouchDB is offline",
description: "CouchDB is offline and unavailable for requests."
};
/** @type {IndicatorState} */
export const MAINTENANCE = {
statusClass: "s-status-warning-lo",
text: "CouchDB is in maintenance mode",
description: "CouchDB is online, but not currently accepting requests."
};
export default class CouchStatusIndicator {
constructor(simpleIndicator) {
this.indicator = simpleIndicator;
this.#setDefaults();
}
/**
* Set the default values for the indicator.
* @private
*/
#setDefaults() {
this.setIndicatorToState(PENDING);
}
/**
* Set the indicator to the given state.
* @param {IndicatorState} state
*/
setIndicatorToState(state) {
this.indicator.text(state.text);
this.indicator.description(state.description);
this.indicator.statusClass(state.statusClass);
}
}

@ -22,13 +22,18 @@
import CouchObjectProvider from './CouchObjectProvider';
import CouchSearchProvider from './CouchSearchProvider';
import CouchStatusIndicator from './CouchStatusIndicator';
const NAMESPACE = '';
const LEGACY_SPACE = 'mct';
const COUCH_SEARCH_ONLY_NAMESPACE = `COUCH_SEARCH_${Date.now()}`;
export default function CouchPlugin(options) {
return function install(openmct) {
install.couchProvider = new CouchObjectProvider(openmct, options, NAMESPACE);
const simpleIndicator = openmct.indicators.simpleIndicator();
openmct.indicators.add(simpleIndicator);
const couchStatusIndicator = new CouchStatusIndicator(simpleIndicator);
install.couchProvider = new CouchObjectProvider(openmct, options, NAMESPACE, couchStatusIndicator);
// Unfortunately, for historical reasons, Couch DB produces objects with a mix of namepaces (alternately "mct", and "")
// Installing the same provider under both namespaces means that it can respond to object gets for both namespaces.

@ -19,11 +19,13 @@
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import Vue from 'vue';
import CouchPlugin from './plugin.js';
import {
createOpenMct,
resetApplicationState, spyOnBuiltins
} from 'utils/testing';
import { CONNECTED, DISCONNECTED, PENDING } from './CouchStatusIndicator';
describe('the plugin', () => {
let openmct;
@ -74,10 +76,11 @@ describe('the plugin', () => {
spyOn(provider, 'get').and.callThrough();
spyOn(provider, 'create').and.callThrough();
spyOn(provider, 'update').and.callThrough();
spyOn(provider, 'startSharedWorker').and.callThrough();
spyOn(provider, 'observe').and.callThrough();
spyOn(provider, 'fetchChanges').and.callThrough();
spyOn(provider, 'onSharedWorkerMessage').and.callThrough();
spyOn(provider, 'onEventMessage').and.callThrough();
spyOn(provider, 'isObservingObjectChanges').and.callThrough();
});
afterEach(() => {
@ -109,7 +112,7 @@ describe('the plugin', () => {
it('creates an object and starts shared worker', async () => {
const result = await openmct.objects.save(mockDomainObject);
expect(provider.create).toHaveBeenCalled();
expect(provider.startSharedWorker).toHaveBeenCalled();
expect(provider.observe).toHaveBeenCalled();
expect(result).toBeTrue();
});
@ -165,7 +168,9 @@ describe('the plugin', () => {
const result = await openmct.objects.save(mockDomainObject);
expect(result).toBeTrue();
expect(provider.create).toHaveBeenCalled();
expect(provider.startSharedWorker).not.toHaveBeenCalled();
expect(provider.observe).toHaveBeenCalled();
expect(provider.isObservingObjectChanges).toHaveBeenCalled();
expect(provider.isObservingObjectChanges.calls.mostRecent().returnValue).toBe(true);
//Set modified timestamp it detects a change and persists the updated model.
mockDomainObject.modified = mockDomainObject.persisted + 1;
@ -277,3 +282,120 @@ describe('the plugin', () => {
});
});
});
describe('the view', () => {
let openmct;
let options;
let appHolder;
let testPath = 'http://localhost:9990/openmct';
let provider;
let mockDomainObject;
beforeEach((done) => {
openmct = createOpenMct();
spyOnBuiltins(['fetch'], window);
options = {
url: testPath,
filter: {}
};
mockDomainObject = {
identifier: {
namespace: '',
key: 'some-value'
},
type: 'notebook',
modified: 0
};
openmct.install(new CouchPlugin(options));
appHolder = document.createElement('div');
document.body.appendChild(appHolder);
openmct.on('start', done);
openmct.start(appHolder);
provider = openmct.objects.getProvider(mockDomainObject.identifier);
spyOn(provider, 'onSharedWorkerMessage').and.callThrough();
});
afterEach(() => {
return resetApplicationState(openmct);
});
describe('updates CouchDB status indicator', () => {
let mockPromise;
function assertCouchIndicatorStatus(status) {
const indicator = appHolder.querySelector('.c-indicator--simple');
expect(indicator).not.toBeNull();
expect(indicator).toHaveClass(status.statusClass);
expect(indicator.textContent).toMatch(new RegExp(status.text, 'i'));
expect(indicator.title).toMatch(new RegExp(status.title, 'i'));
}
it("to 'connected' on successful request", async () => {
mockPromise = Promise.resolve({
status: 200,
json: () => {
return {
ok: true,
_id: 'some-value',
id: 'some-value',
_rev: 1,
model: {}
};
}
});
fetch.and.returnValue(mockPromise);
await openmct.objects.get({
namespace: '',
key: 'object-1'
});
await Vue.nextTick();
assertCouchIndicatorStatus(CONNECTED);
});
it("to 'disconnected' on failed request", async () => {
fetch.and.throwError(new TypeError('ERR_CONNECTION_REFUSED'));
await openmct.objects.get({
namespace: '',
key: 'object-1'
});
await Vue.nextTick();
assertCouchIndicatorStatus(DISCONNECTED);
});
it("to 'pending'", async () => {
const workerMessage = {
data: {
type: 'state',
state: 'pending'
}
};
mockPromise = Promise.resolve({
status: 200,
json: () => {
return {
ok: true,
_id: 'some-value',
id: 'some-value',
_rev: 1,
model: {}
};
}
});
fetch.and.returnValue(mockPromise);
await openmct.objects.get({
namespace: '',
key: 'object-1'
});
// Simulate 'pending' state from worker message
provider.onSharedWorkerMessage(workerMessage);
await Vue.nextTick();
assertCouchIndicatorStatus(PENDING);
});
});
});