Couchdb object synchronization (#3674)

* Implements ObjectAPI changes to refresh objects when an update is received from the database.
* Populates a virtual folder of plans from CouchDB
* Fixes bug with supportsMutation API call parameters
This commit is contained in:
Shefali Joshi 2021-02-22 18:35:11 -08:00 committed by GitHub
parent dd3d4c8c3a
commit 29128a891d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 513 additions and 68 deletions

View File

@ -219,7 +219,7 @@ define([
* @memberof module:openmct.MCT#
* @name objects
*/
this.objects = new api.ObjectAPI.default(this.types);
this.objects = new api.ObjectAPI.default(this.types, this);
/**
* An interface for retrieving and interpreting telemetry data associated

View File

@ -196,7 +196,7 @@ define([
this.provider.add(this.domainObject, child.identifier);
} else {
if (this.returnMutables && this.publicAPI.objects.supportsMutation(child)) {
if (this.returnMutables && this.publicAPI.objects.supportsMutation(child.identifier)) {
let keyString = this.publicAPI.objects.makeKeyString(child.identifier);
child = this.publicAPI.objects._toMutable(child);

View File

@ -96,6 +96,16 @@ class MutableDomainObject {
//TODO: Emit events for listeners of child properties when parent changes.
// Do it at observer time - also register observers for parent attribute path.
}
$refresh(model) {
//TODO: Currently we are updating the entire object.
// In the future we could update a specific property of the object using the 'path' parameter.
this._globalEventEmitter.emit(qualifiedEventName(this, '$_synchronize_model'), model);
//Emit wildcard event, with path so that callback knows what changed
this._globalEventEmitter.emit(qualifiedEventName(this, '*'), this);
}
$on(event, callback) {
this._instanceEventEmitter.on(event, callback);

View File

@ -33,11 +33,15 @@ import InterceptorRegistry from './InterceptorRegistry';
* @memberof module:openmct
*/
function ObjectAPI(typeRegistry) {
function ObjectAPI(typeRegistry, openmct) {
this.typeRegistry = typeRegistry;
this.eventEmitter = new EventEmitter();
this.providers = {};
this.rootRegistry = new RootRegistry();
this.injectIdentifierService = function () {
this.identifierService = openmct.$injector.get("identifierService");
};
this.rootProvider = new RootObjectProvider(this.rootRegistry);
this.cache = {};
this.interceptorRegistry = new InterceptorRegistry();
@ -51,16 +55,33 @@ ObjectAPI.prototype.supersecretSetFallbackProvider = function (p) {
this.fallbackProvider = p;
};
/**
* @private
*/
ObjectAPI.prototype.getIdentifierService = function () {
// Lazily acquire identifier service
if (!this.identifierService) {
this.injectIdentifierService();
}
return this.identifierService;
};
/**
* Retrieve the provider for a given identifier.
* @private
*/
ObjectAPI.prototype.getProvider = function (identifier) {
//handles the '' vs 'mct' namespace issue
const keyString = utils.makeKeyString(identifier);
const identifierService = this.getIdentifierService();
const namespace = identifierService.parse(keyString).getSpace();
if (identifier.key === 'ROOT') {
return this.rootProvider;
}
return this.providers[identifier.namespace] || this.fallbackProvider;
return this.providers[namespace] || this.fallbackProvider;
};
/**
@ -207,13 +228,29 @@ ObjectAPI.prototype.search = function (query, options) {
* @returns {Promise.<MutableDomainObject>} a promise that will resolve with a MutableDomainObject if
* the object can be mutated.
*/
ObjectAPI.prototype.getMutable = function (identifier) {
if (!this.supportsMutation(identifier)) {
throw new Error(`Object "${this.makeKeyString(identifier)}" does not support mutation.`);
ObjectAPI.prototype.getMutable = function (idOrKeyString) {
if (!this.supportsMutation(idOrKeyString)) {
throw new Error(`Object "${this.makeKeyString(idOrKeyString)}" does not support mutation.`);
}
return this.get(identifier).then((object) => {
return this._toMutable(object);
return this.get(idOrKeyString).then((object) => {
const mutableDomainObject = this._toMutable(object);
// Check if provider supports realtime updates
let identifier = utils.parseKeyString(idOrKeyString);
let provider = this.getProvider(identifier);
if (provider !== undefined
&& provider.observe !== undefined) {
let unobserve = provider.observe(identifier, (updatedModel) => {
mutableDomainObject.$refresh(updatedModel);
});
mutableDomainObject.$on('$destroy', () => {
unobserve();
});
}
return mutableDomainObject;
});
};

View File

@ -3,6 +3,8 @@ import ObjectAPI from './ObjectAPI.js';
describe("The Object API", () => {
let objectAPI;
let typeRegistry;
let openmct = {};
let mockIdentifierService;
let mockDomainObject;
const TEST_NAMESPACE = "test-namespace";
const FIFTEEN_MINUTES = 15 * 60 * 1000;
@ -11,7 +13,19 @@ describe("The Object API", () => {
typeRegistry = jasmine.createSpyObj('typeRegistry', [
'get'
]);
objectAPI = new ObjectAPI(typeRegistry);
openmct.$injector = jasmine.createSpyObj('$injector', ['get']);
mockIdentifierService = jasmine.createSpyObj(
'identifierService',
['parse']
);
mockIdentifierService.parse.and.returnValue({
getSpace: () => {
return TEST_NAMESPACE;
}
});
openmct.$injector.get.and.returnValue(mockIdentifierService);
objectAPI = new ObjectAPI(typeRegistry, openmct);
mockDomainObject = {
identifier: {
namespace: TEST_NAMESPACE,
@ -136,11 +150,13 @@ describe("The Object API", () => {
describe("the mutation API", () => {
let testObject;
let updatedTestObject;
let mutable;
let mockProvider;
let callbacks = [];
beforeEach(function () {
objectAPI = new ObjectAPI(typeRegistry);
objectAPI = new ObjectAPI(typeRegistry, openmct);
testObject = {
identifier: {
namespace: TEST_NAMESPACE,
@ -154,12 +170,27 @@ describe("The Object API", () => {
}
}
};
updatedTestObject = Object.assign({otherAttribute: 'changed-attribute-value'}, testObject);
mockProvider = jasmine.createSpyObj("mock provider", [
"get",
"create",
"update"
"update",
"observe",
"observeObjectChanges"
]);
mockProvider.get.and.returnValue(Promise.resolve(testObject));
mockProvider.observeObjectChanges.and.callFake(() => {
callbacks[0](updatedTestObject);
callbacks.splice(0, 1);
});
mockProvider.observe.and.callFake((id, callback) => {
if (callbacks.length === 0) {
callbacks.push(callback);
} else {
callbacks[0] = callback;
}
});
objectAPI.addProvider(TEST_NAMESPACE, mockProvider);
return objectAPI.getMutable(testObject.identifier)
@ -191,6 +222,13 @@ describe("The Object API", () => {
it('that is identical to original object when serialized', function () {
expect(JSON.stringify(mutable)).toEqual(JSON.stringify(testObject));
});
it('that observes for object changes', function () {
let mockListener = jasmine.createSpy('mockListener');
objectAPI.observe(testObject, '*', mockListener);
mockProvider.observeObjectChanges();
expect(mockListener).toHaveBeenCalled();
});
});
describe('uses events', function () {

View File

@ -0,0 +1,37 @@
export default function (couchPlugin, searchFilter) {
return function install(openmct) {
const couchProvider = couchPlugin.couchProvider;
openmct.objects.addRoot({
namespace: 'couch-search',
key: 'couch-search'
});
openmct.objects.addProvider('couch-search', {
get(identifier) {
if (identifier.key !== 'couch-search') {
return undefined;
} else {
return Promise.resolve({
identifier,
type: 'folder',
name: "CouchDB Documents"
});
}
}
});
openmct.composition.addProvider({
appliesTo(domainObject) {
return domainObject.identifier.namespace === 'couch-search'
&& domainObject.identifier.key === 'couch-search';
},
load() {
return couchProvider.getObjectsByFilter(searchFilter).then(objects => {
return objects.map(object => object.identifier);
});
}
});
};
}

View File

@ -0,0 +1,91 @@
/*****************************************************************************
* Open MCT, Copyright (c) 2014-2020, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import {createOpenMct, resetApplicationState} from "utils/testing";
import CouchDBSearchFolderPlugin from './plugin';
describe('the plugin', function () {
let identifier = {
namespace: 'couch-search',
key: "couch-search"
};
let testPath = '/test/db';
let openmct;
let composition;
beforeEach((done) => {
openmct = createOpenMct();
let couchPlugin = openmct.plugins.CouchDB(testPath);
openmct.install(couchPlugin);
openmct.install(new CouchDBSearchFolderPlugin(couchPlugin, {
"selector": {
"model": {
"type": "plan"
}
}
}));
openmct.on('start', done);
openmct.startHeadless();
composition = openmct.composition.get({identifier});
spyOn(couchPlugin.couchProvider, 'getObjectsByFilter').and.returnValue(Promise.resolve([
{
identifier: {
key: "1",
namespace: "mct"
}
},
{
identifier: {
key: "2",
namespace: "mct"
}
}
]));
});
afterEach(() => {
return resetApplicationState(openmct);
});
it('provides a folder to hold plans', () => {
openmct.objects.get(identifier).then((object) => {
expect(object).toEqual({
identifier,
type: 'folder',
name: "CouchDB Documents"
});
});
});
it('provides composition for couch search folders', () => {
composition.load().then((objects) => {
expect(objects.length).toEqual(2);
});
});
});

View File

@ -109,10 +109,23 @@ const selectedPage = {
};
let openmct;
let mockIdentifierService;
describe('Notebook Entries:', () => {
beforeEach(done => {
openmct = createOpenMct();
openmct.$injector = jasmine.createSpyObj('$injector', ['get']);
mockIdentifierService = jasmine.createSpyObj(
'identifierService',
['parse']
);
mockIdentifierService.parse.and.returnValue({
getSpace: () => {
return '';
}
});
openmct.$injector.get.and.returnValue(mockIdentifierService);
openmct.types.addType('notebook', {
creatable: true
});

View File

@ -25,13 +25,36 @@ import CouchObjectQueue from "./CouchObjectQueue";
const REV = "_rev";
const ID = "_id";
const HEARTBEAT = 50000;
export default class CouchObjectProvider {
constructor(openmct, url, namespace) {
// options {
// url: couchdb url,
// disableObserve: disable auto feed from couchdb to keep objects in sync,
// filter: selector to find objects to sync in couchdb
// }
constructor(openmct, options, namespace) {
options = this._normalize(options);
this.openmct = openmct;
this.url = url;
this.url = options.url;
this.namespace = namespace;
this.objectQueue = {};
this.observeEnabled = options.disableObserve !== true;
this.observers = {};
if (this.observeEnabled) {
this.observeObjectChanges(options.filter);
}
}
//backwards compatibility, options used to be a url. Now it's an object
_normalize(options) {
if (typeof options === 'string') {
return {
url: options
};
}
return options;
}
request(subPath, method, value) {
@ -102,6 +125,162 @@ export default class CouchObjectProvider {
return this.request(identifier.key, "GET").then(this.getModel.bind(this));
}
async getObjectsByFilter(filter) {
let objects = [];
let url = `${this.url}/_find`;
let body = {};
if (filter) {
body = JSON.stringify(filter);
}
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body
});
const reader = response.body.getReader();
let completed = false;
while (!completed) {
const {done, value} = await reader.read();
//done is true when we lose connection with the provider
if (done) {
completed = true;
}
if (value) {
let chunk = new Uint8Array(value.length);
chunk.set(value, 0);
const decodedChunk = new TextDecoder("utf-8").decode(chunk);
try {
const json = JSON.parse(decodedChunk);
if (json) {
let docs = json.docs;
docs.forEach(doc => {
let object = this.getModel(doc);
if (object) {
objects.push(object);
}
});
}
} catch (e) {
//do nothing
}
}
}
return objects;
}
observe(identifier, callback) {
if (!this.observeEnabled) {
return;
}
const keyString = this.openmct.objects.makeKeyString(identifier);
this.observers[keyString] = this.observers[keyString] || [];
this.observers[keyString].push(callback);
return () => {
this.observers[keyString] = this.observers[keyString].filter(observer => observer !== callback);
};
}
abortGetChanges() {
if (this.controller) {
this.controller.abort();
this.controller = undefined;
}
return true;
}
async observeObjectChanges(filter) {
let intermediateResponse = this.getIntermediateResponse();
if (!this.observeEnabled) {
intermediateResponse.reject('Observe for changes is disabled');
}
const controller = new AbortController();
const signal = controller.signal;
if (this.controller) {
this.abortGetChanges();
}
this.controller = controller;
// feed=continuous maintains an indefinitely open connection with a keep-alive of HEARTBEAT milliseconds until this client closes the connection
// style=main_only returns only the current winning revision of the document
let url = `${this.url}/_changes?feed=continuous&style=main_only&heartbeat=${HEARTBEAT}`;
let body = {};
if (filter) {
url = `${url}&filter=_selector`;
body = JSON.stringify(filter);
}
const response = await fetch(url, {
method: 'POST',
signal,
headers: {
"Content-Type": 'application/json'
},
body
});
const reader = response.body.getReader();
let completed = false;
while (!completed) {
const {done, value} = await reader.read();
//done is true when we lose connection with the provider
if (done) {
completed = true;
}
if (value) {
let chunk = new Uint8Array(value.length);
chunk.set(value, 0);
const decodedChunk = new TextDecoder("utf-8").decode(chunk).split('\n');
if (decodedChunk.length && decodedChunk[decodedChunk.length - 1] === '') {
decodedChunk.forEach((doc, index) => {
try {
const object = JSON.parse(doc);
object.identifier = {
namespace: this.namespace,
key: object.id
};
let keyString = this.openmct.objects.makeKeyString(object.identifier);
let observersForObject = this.observers[keyString];
if (observersForObject) {
observersForObject.forEach(async (observer) => {
const updatedObject = await this.get(object.identifier);
observer(updatedObject);
});
}
} catch (e) {
//do nothing;
}
});
}
}
}
//We're done receiving from the provider. No more chunks.
intermediateResponse.resolve(true);
return intermediateResponse.promise;
}
getIntermediateResponse() {
let intermediateResponse = {};
intermediateResponse.promise = new Promise(function (resolve, reject) {

View File

@ -24,8 +24,9 @@ import CouchObjectProvider from './CouchObjectProvider';
const NAMESPACE = '';
const PERSISTENCE_SPACE = 'mct';
export default function CouchPlugin(url) {
export default function CouchPlugin(options) {
return function install(openmct) {
openmct.objects.addProvider(PERSISTENCE_SPACE, new CouchObjectProvider(openmct, url, NAMESPACE));
install.couchProvider = new CouchObjectProvider(openmct, options, NAMESPACE);
openmct.objects.addProvider(PERSISTENCE_SPACE, install.couchProvider);
};
}

View File

@ -32,18 +32,42 @@ describe('the plugin', () => {
let child;
let provider;
let testPath = '/test/db';
let options;
let mockIdentifierService;
let mockDomainObject;
beforeEach((done) => {
mockDomainObject = {
identifier: {
namespace: 'mct',
namespace: '',
key: 'some-value'
},
type: 'mock-type'
};
options = {
url: testPath,
filter: {},
disableObserve: true
};
openmct = createOpenMct(false);
openmct.install(new CouchPlugin(testPath));
spyOnBuiltins(['fetch'], window);
openmct.$injector = jasmine.createSpyObj('$injector', ['get']);
mockIdentifierService = jasmine.createSpyObj(
'identifierService',
['parse']
);
mockIdentifierService.parse.and.returnValue({
getSpace: () => {
return 'mct';
}
});
openmct.$injector.get.and.returnValue(mockIdentifierService);
openmct.install(new CouchPlugin(options));
openmct.types.addType('mock-type', {creatable: true});
element = document.createElement('div');
@ -57,62 +81,67 @@ describe('the plugin', () => {
spyOn(provider, 'get').and.callThrough();
spyOn(provider, 'create').and.callThrough();
spyOn(provider, 'update').and.callThrough();
spyOnBuiltins(['fetch'], window);
fetch.and.returnValue(Promise.resolve({
json: () => {
return {
ok: true,
_id: 'some-value',
_rev: 1,
model: {}
};
}
}));
});
afterEach(() => {
return resetApplicationState(openmct);
});
it('gets an object', () => {
openmct.objects.get(mockDomainObject.identifier).then((result) => {
expect(result.identifier.key).toEqual(mockDomainObject.identifier.key);
describe('the provider', () => {
let mockPromise;
beforeEach(() => {
mockPromise = Promise.resolve({
json: () => {
return {
ok: true,
_id: 'some-value',
_rev: 1,
model: {}
};
}
});
fetch.and.returnValue(mockPromise);
});
});
it('creates an object', () => {
openmct.objects.save(mockDomainObject).then((result) => {
expect(provider.create).toHaveBeenCalled();
expect(result).toBeTrue();
});
});
it('updates an object', () => {
openmct.objects.save(mockDomainObject).then((result) => {
expect(result).toBeTrue();
expect(provider.create).toHaveBeenCalled();
openmct.objects.save(mockDomainObject).then((updatedResult) => {
expect(updatedResult).toBeTrue();
expect(provider.update).toHaveBeenCalled();
it('gets an object', () => {
openmct.objects.get(mockDomainObject.identifier).then((result) => {
expect(result.identifier.key).toEqual(mockDomainObject.identifier.key);
});
});
});
it('updates queued objects', () => {
let couchProvider = new CouchObjectProvider(openmct, 'http://localhost', '');
let intermediateResponse = couchProvider.getIntermediateResponse();
spyOn(couchProvider, 'updateQueued');
couchProvider.enqueueObject(mockDomainObject.identifier.key, mockDomainObject, intermediateResponse);
couchProvider.objectQueue[mockDomainObject.identifier.key].updateRevision(1);
couchProvider.update(mockDomainObject);
expect(couchProvider.objectQueue[mockDomainObject.identifier.key].hasNext()).toBe(2);
couchProvider.checkResponse({
ok: true,
rev: 2,
id: mockDomainObject.identifier.key
}, intermediateResponse);
it('creates an object', () => {
openmct.objects.save(mockDomainObject).then((result) => {
expect(provider.create).toHaveBeenCalled();
expect(result).toBeTrue();
});
});
expect(couchProvider.updateQueued).toHaveBeenCalledTimes(2);
it('updates an object', () => {
openmct.objects.save(mockDomainObject).then((result) => {
expect(result).toBeTrue();
expect(provider.create).toHaveBeenCalled();
openmct.objects.save(mockDomainObject).then((updatedResult) => {
expect(updatedResult).toBeTrue();
expect(provider.update).toHaveBeenCalled();
});
});
});
it('updates queued objects', () => {
let couchProvider = new CouchObjectProvider(openmct, options, '');
let intermediateResponse = couchProvider.getIntermediateResponse();
spyOn(couchProvider, 'updateQueued');
couchProvider.enqueueObject(mockDomainObject.identifier.key, mockDomainObject, intermediateResponse);
couchProvider.objectQueue[mockDomainObject.identifier.key].updateRevision(1);
couchProvider.update(mockDomainObject);
expect(couchProvider.objectQueue[mockDomainObject.identifier.key].hasNext()).toBe(2);
couchProvider.checkResponse({
ok: true,
rev: 2,
id: mockDomainObject.identifier.key
}, intermediateResponse);
expect(couchProvider.updateQueued).toHaveBeenCalledTimes(2);
});
});
});

View File

@ -63,7 +63,8 @@ define([
'./timeline/plugin',
'./viewDatumAction/plugin',
'./interceptors/plugin',
'./performanceIndicator/plugin'
'./performanceIndicator/plugin',
'./CouchDBSearchFolder/plugin'
], function (
_,
UTCTimeSystem,
@ -107,7 +108,8 @@ define([
Timeline,
ViewDatumAction,
ObjectInterceptors,
PerformanceIndicator
PerformanceIndicator,
CouchDBSearchFolder
) {
const bundleMap = {
LocalStorage: 'platform/persistence/local',
@ -206,6 +208,7 @@ define([
plugins.ViewDatumAction = ViewDatumAction.default;
plugins.ObjectInterceptors = ObjectInterceptors.default;
plugins.PerformanceIndicator = PerformanceIndicator.default;
plugins.CouchDBSearchFolder = CouchDBSearchFolder.default;
return plugins;
});

View File

@ -61,6 +61,11 @@ export default {
this.openmct.time.on("timeSystem", this.setScaleAndPlotActivities);
this.openmct.time.on("bounds", this.updateViewBounds);
this.resizeTimer = setInterval(this.resize, RESIZE_POLL_INTERVAL);
if (this.openmct.objects.supportsMutation(this.domainObject.identifier)) {
this.openmct.objects.getMutable(this.domainObject.identifier)
.then(this.observeForChanges);
}
this.unlisten = this.openmct.objects.observe(this.domainObject, '*', this.observeForChanges);
},
destroyed() {
@ -73,8 +78,10 @@ export default {
},
methods: {
observeForChanges(mutatedObject) {
this.validateJSON(mutatedObject.selectFile.body);
this.setScaleAndPlotActivities();
if (mutatedObject.selectFile) {
this.validateJSON(mutatedObject.selectFile.body);
this.setScaleAndPlotActivities();
}
},
resize() {
if (this.$refs.axisHolder.clientWidth !== this.width) {

View File

@ -43,7 +43,7 @@ define([
mutable = undefined;
}
if (openmct.objects.supportsMutation(object)) {
if (openmct.objects.supportsMutation(object.identifier)) {
mutable = openmct.objects._toMutable(object);
}