mirror of
https://github.com/nasa/openmct.git
synced 2025-06-21 16:49:42 +00:00
Batch Couch DB create calls (#6779)
* Implement persistence batching for Couch DB * Add tests for persistence batching --------- Co-authored-by: Shefali Joshi <simplyrender@gmail.com>
This commit is contained in:
@ -24,6 +24,7 @@ import CouchDocument from './CouchDocument';
|
|||||||
import CouchObjectQueue from './CouchObjectQueue';
|
import CouchObjectQueue from './CouchObjectQueue';
|
||||||
import { PENDING, CONNECTED, DISCONNECTED, UNKNOWN } from './CouchStatusIndicator';
|
import { PENDING, CONNECTED, DISCONNECTED, UNKNOWN } from './CouchStatusIndicator';
|
||||||
import { isNotebookOrAnnotationType } from '../../notebook/notebook-constants.js';
|
import { isNotebookOrAnnotationType } from '../../notebook/notebook-constants.js';
|
||||||
|
import _ from 'lodash';
|
||||||
|
|
||||||
const REV = '_rev';
|
const REV = '_rev';
|
||||||
const ID = '_id';
|
const ID = '_id';
|
||||||
@ -42,6 +43,8 @@ class CouchObjectProvider {
|
|||||||
this.batchIds = [];
|
this.batchIds = [];
|
||||||
this.onEventMessage = this.onEventMessage.bind(this);
|
this.onEventMessage = this.onEventMessage.bind(this);
|
||||||
this.onEventError = this.onEventError.bind(this);
|
this.onEventError = this.onEventError.bind(this);
|
||||||
|
this.flushPersistenceQueue = _.debounce(this.flushPersistenceQueue.bind(this));
|
||||||
|
this.persistenceQueue = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -668,9 +671,12 @@ class CouchObjectProvider {
|
|||||||
if (!this.objectQueue[key].pending) {
|
if (!this.objectQueue[key].pending) {
|
||||||
this.objectQueue[key].pending = true;
|
this.objectQueue[key].pending = true;
|
||||||
const queued = this.objectQueue[key].dequeue();
|
const queued = this.objectQueue[key].dequeue();
|
||||||
let document = new CouchDocument(key, queued.model);
|
let couchDocument = new CouchDocument(key, queued.model);
|
||||||
document.metadata.created = Date.now();
|
couchDocument.metadata.created = Date.now();
|
||||||
this.request(key, 'PUT', document)
|
this.#enqueueForPersistence({
|
||||||
|
key,
|
||||||
|
document: couchDocument
|
||||||
|
})
|
||||||
.then((response) => {
|
.then((response) => {
|
||||||
this.#checkResponse(response, queued.intermediateResponse, key);
|
this.#checkResponse(response, queued.intermediateResponse, key);
|
||||||
})
|
})
|
||||||
@ -683,6 +689,42 @@ class CouchObjectProvider {
|
|||||||
return intermediateResponse.promise;
|
return intermediateResponse.promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#enqueueForPersistence({ key, document }) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.persistenceQueue.push({
|
||||||
|
key,
|
||||||
|
document,
|
||||||
|
resolve,
|
||||||
|
reject
|
||||||
|
});
|
||||||
|
this.flushPersistenceQueue();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async flushPersistenceQueue() {
|
||||||
|
if (this.persistenceQueue.length > 1) {
|
||||||
|
const batch = {
|
||||||
|
docs: this.persistenceQueue.map((queued) => queued.document)
|
||||||
|
};
|
||||||
|
const response = await this.request('_bulk_docs', 'POST', batch);
|
||||||
|
response.forEach((responseMetadatum) => {
|
||||||
|
const queued = this.persistenceQueue.find(
|
||||||
|
(queuedMetadatum) => queuedMetadatum.key === responseMetadatum.id
|
||||||
|
);
|
||||||
|
if (responseMetadatum.ok) {
|
||||||
|
queued.resolve(responseMetadatum);
|
||||||
|
} else {
|
||||||
|
queued.reject(responseMetadatum);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else if (this.persistenceQueue.length === 1) {
|
||||||
|
const { key, document, resolve, reject } = this.persistenceQueue[0];
|
||||||
|
|
||||||
|
this.request(key, 'PUT', document).then(resolve).catch(reject);
|
||||||
|
}
|
||||||
|
this.persistenceQueue = [];
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @private
|
* @private
|
||||||
*/
|
*/
|
||||||
|
@ -243,6 +243,135 @@ describe('the plugin', () => {
|
|||||||
expect(requestMethod).toEqual('GET');
|
expect(requestMethod).toEqual('GET');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
describe('batches persistence', () => {
|
||||||
|
let successfulMockPromise;
|
||||||
|
let partialFailureMockPromise;
|
||||||
|
let objectsToPersist;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
successfulMockPromise = Promise.resolve({
|
||||||
|
json: () => {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
id: 'object-1',
|
||||||
|
ok: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'object-2',
|
||||||
|
ok: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'object-3',
|
||||||
|
ok: true
|
||||||
|
}
|
||||||
|
];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
partialFailureMockPromise = Promise.resolve({
|
||||||
|
json: () => {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
id: 'object-1',
|
||||||
|
ok: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'object-2',
|
||||||
|
ok: false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'object-3',
|
||||||
|
ok: true
|
||||||
|
}
|
||||||
|
];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
objectsToPersist = [
|
||||||
|
{
|
||||||
|
identifier: {
|
||||||
|
namespace: '',
|
||||||
|
key: 'object-1'
|
||||||
|
},
|
||||||
|
name: 'object-1',
|
||||||
|
type: 'folder',
|
||||||
|
modified: 0
|
||||||
|
},
|
||||||
|
{
|
||||||
|
identifier: {
|
||||||
|
namespace: '',
|
||||||
|
key: 'object-2'
|
||||||
|
},
|
||||||
|
name: 'object-2',
|
||||||
|
type: 'folder',
|
||||||
|
modified: 0
|
||||||
|
},
|
||||||
|
{
|
||||||
|
identifier: {
|
||||||
|
namespace: '',
|
||||||
|
key: 'object-3'
|
||||||
|
},
|
||||||
|
name: 'object-3',
|
||||||
|
type: 'folder',
|
||||||
|
modified: 0
|
||||||
|
}
|
||||||
|
];
|
||||||
|
});
|
||||||
|
it('for multiple simultaneous successful saves', async () => {
|
||||||
|
fetch.and.returnValue(successfulMockPromise);
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
objectsToPersist.map((objectToPersist) => openmct.objects.save(objectToPersist))
|
||||||
|
);
|
||||||
|
|
||||||
|
const requestUrl = fetch.calls.mostRecent().args[0];
|
||||||
|
const requestMethod = fetch.calls.mostRecent().args[1].method;
|
||||||
|
const requestBody = JSON.parse(fetch.calls.mostRecent().args[1].body);
|
||||||
|
|
||||||
|
expect(fetch).toHaveBeenCalledTimes(1);
|
||||||
|
expect(requestUrl.includes('_bulk_docs')).toBeTrue();
|
||||||
|
expect(requestMethod).toEqual('POST');
|
||||||
|
expect(
|
||||||
|
objectsToPersist.every(
|
||||||
|
(object, index) => object.identifier.key === requestBody.docs[index]._id
|
||||||
|
)
|
||||||
|
).toBeTrue();
|
||||||
|
});
|
||||||
|
it('for multiple simultaneous saves with partial failure', async () => {
|
||||||
|
fetch.and.returnValue(partialFailureMockPromise);
|
||||||
|
|
||||||
|
let saveResults = await Promise.all(
|
||||||
|
objectsToPersist.map((objectToPersist) =>
|
||||||
|
openmct.objects
|
||||||
|
.save(objectToPersist)
|
||||||
|
.then(() => true)
|
||||||
|
.catch(() => false)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
expect(saveResults[0]).toBeTrue();
|
||||||
|
expect(saveResults[1]).toBeFalse();
|
||||||
|
expect(saveResults[2]).toBeTrue();
|
||||||
|
});
|
||||||
|
it('except for a single save', async () => {
|
||||||
|
fetch.and.returnValue({
|
||||||
|
json: () => {
|
||||||
|
return {
|
||||||
|
id: 'object-1',
|
||||||
|
ok: true
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await openmct.objects.save(objectsToPersist[0]);
|
||||||
|
|
||||||
|
const requestUrl = fetch.calls.mostRecent().args[0];
|
||||||
|
const requestMethod = fetch.calls.mostRecent().args[1].method;
|
||||||
|
|
||||||
|
expect(fetch).toHaveBeenCalledTimes(1);
|
||||||
|
expect(requestUrl.includes('_bulk_docs')).toBeFalse();
|
||||||
|
expect(requestUrl.endsWith('object-1')).toBeTrue();
|
||||||
|
expect(requestMethod).toEqual('PUT');
|
||||||
|
});
|
||||||
|
});
|
||||||
describe('implements server-side search', () => {
|
describe('implements server-side search', () => {
|
||||||
let mockPromise;
|
let mockPromise;
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
|
Reference in New Issue
Block a user