diff --git a/example/profiling/bundle.json b/example/profiling/bundle.json index b6090717d2..25c1b10749 100644 --- a/example/profiling/bundle.json +++ b/example/profiling/bundle.json @@ -4,7 +4,11 @@ { "implementation": "WatchIndicator.js", "depends": ["$interval", "$rootScope"] + }, + { + "implementation": "DigestIndicator.js", + "depends": ["$interval", "$rootScope"] } ] } -} \ No newline at end of file +} diff --git a/example/profiling/src/DigestIndicator.js b/example/profiling/src/DigestIndicator.js new file mode 100644 index 0000000000..02fbc7a08b --- /dev/null +++ b/example/profiling/src/DigestIndicator.js @@ -0,0 +1,77 @@ +/***************************************************************************** + * Open MCT Web, Copyright (c) 2014-2015, United States Government + * as represented by the Administrator of the National Aeronautics and Space + * Administration. All rights reserved. + * + * Open MCT Web is licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * Open MCT Web includes source code licensed under additional open source + * licenses. See the Open Source Licenses file (LICENSES.md) included with + * this source code distribution or the Licensing information page available + * at runtime from the About dialog for additional information. + *****************************************************************************/ +/*global define*/ + +define( + [], + function () { + "use strict"; + + /** + * Displays the number of digests that have occurred since the + * indicator was first instantiated. + * @constructor + * @param $interval Angular's $interval + * @implements {Indicator} + */ + function DigestIndicator($interval, $rootScope) { + var digests = 0, + displayed = 0, + start = Date.now(); + + function update() { + var secs = (Date.now() - start) / 1000; + displayed = Math.round(digests / secs); + } + + function increment() { + digests += 1; + } + + $rootScope.$watch(increment); + + // Update state every second + $interval(update, 1000); + + // Provide initial state, too + update(); + + return { + getGlyph: function () { + return "."; + }, + getGlyphClass: function () { + return undefined; + }, + getText: function () { + return displayed + " digests/sec"; + }, + getDescription: function () { + return ""; + } + }; + } + + return DigestIndicator; + + } +); diff --git a/platform/core/src/capabilities/MutationCapability.js b/platform/core/src/capabilities/MutationCapability.js index b9f49ca969..1f08abda6b 100644 --- a/platform/core/src/capabilities/MutationCapability.js +++ b/platform/core/src/capabilities/MutationCapability.js @@ -29,7 +29,8 @@ define( function () { "use strict"; - var TOPIC_PREFIX = "mutation:"; + var GENERAL_TOPIC = "mutation", + TOPIC_PREFIX = "mutation:"; // Utility function to overwrite a destination object // with the contents of a source object. @@ -78,7 +79,11 @@ define( * @implements {Capability} */ function MutationCapability(topic, now, domainObject) { - this.mutationTopic = topic(TOPIC_PREFIX + domainObject.getId()); + this.generalMutationTopic = + topic(GENERAL_TOPIC); + this.specificMutationTopic = + topic(TOPIC_PREFIX + domainObject.getId()); + this.now = now; this.domainObject = domainObject; } @@ -115,11 +120,17 @@ define( // mutator function has a temporary copy to work with. var domainObject = this.domainObject, now = this.now, - t = this.mutationTopic, + generalTopic = this.generalMutationTopic, + specificTopic = this.specificMutationTopic, model = domainObject.getModel(), clone = JSON.parse(JSON.stringify(model)), useTimestamp = arguments.length > 1; + function notifyListeners(model) { + generalTopic.notify(domainObject); + specificTopic.notify(model); + } + // Function to handle copying values to the actual function handleMutation(mutationResult) { // If mutation result was undefined, just use @@ -136,7 +147,7 @@ define( copyValues(model, result); } model.modified = useTimestamp ? timestamp : now(); - t.notify(model); + notifyListeners(model); } // Report the result of the mutation @@ -158,7 +169,7 @@ define( * @memberof platform/core.MutationCapability# */ MutationCapability.prototype.listen = function (listener) { - return this.mutationTopic.listen(listener); + return this.specificMutationTopic.listen(listener); }; /** diff --git a/platform/core/src/services/Throttle.js b/platform/core/src/services/Throttle.js index 60444ad6c4..4b1ad32530 100644 --- a/platform/core/src/services/Throttle.js +++ b/platform/core/src/services/Throttle.js @@ -61,7 +61,7 @@ define( * @memberof platform/core.Throttle# */ return function (fn, delay, apply) { - var promise, // Promise for the result of throttled function + var promise, args = []; function invoke() { diff --git a/platform/search/bundle.json b/platform/search/bundle.json index 7ea1536556..67dc058ed9 100644 --- a/platform/search/bundle.json +++ b/platform/search/bundle.json @@ -45,7 +45,15 @@ "provides": "searchService", "type": "provider", "implementation": "services/GenericSearchProvider.js", - "depends": [ "$q", "$timeout", "objectService", "workerService", "GENERIC_SEARCH_ROOTS" ] + "depends": [ + "$q", + "$log", + "throttle", + "objectService", + "workerService", + "topic", + "GENERIC_SEARCH_ROOTS" + ] }, { "provides": "searchService", @@ -61,4 +69,4 @@ } ] } -} \ No newline at end of file +} diff --git a/platform/search/src/services/GenericSearchProvider.js b/platform/search/src/services/GenericSearchProvider.js index 014d8d7fda..9574d156fb 100644 --- a/platform/search/src/services/GenericSearchProvider.js +++ b/platform/search/src/services/GenericSearchProvider.js @@ -28,61 +28,78 @@ define( [], function () { "use strict"; - + var DEFAULT_MAX_RESULTS = 100, DEFAULT_TIMEOUT = 1000, + MAX_CONCURRENT_REQUESTS = 100, + FLUSH_INTERVAL = 0, stopTime; - + /** - * A search service which searches through domain objects in + * A search service which searches through domain objects in * the filetree without using external search implementations. * * @constructor * @param $q Angular's $q, for promise consolidation. - * @param $timeout Angular's $timeout, for delayed function execution. + * @param $log Anglar's $log, for logging. + * @param {Function} throttle a function to throttle function invocations * @param {ObjectService} objectService The service from which * domain objects can be gotten. * @param {WorkerService} workerService The service which allows * more easy creation of web workers. - * @param {GENERIC_SEARCH_ROOTS} ROOTS An array of the root + * @param {GENERIC_SEARCH_ROOTS} ROOTS An array of the root * domain objects' IDs. */ - function GenericSearchProvider($q, $timeout, objectService, workerService, ROOTS) { + function GenericSearchProvider($q, $log, throttle, objectService, workerService, topic, ROOTS) { var indexed = {}, + pendingIndex = {}, pendingQueries = {}, - worker = workerService.run('genericSearchWorker'); + toRequest = [], + worker = workerService.run('genericSearchWorker'), + mutationTopic = topic("mutation"), + indexingStarted = Date.now(), + pendingRequests = 0, + scheduleFlush; this.worker = worker; this.pendingQueries = pendingQueries; this.$q = $q; // pendingQueries is a dictionary with the key value pairs st // the key is the timestamp and the value is the promise - + + function scheduleIdsForIndexing(ids) { + ids.forEach(function (id) { + if (!indexed[id] && !pendingIndex[id]) { + indexed[id] = true; + pendingIndex[id] = true; + toRequest.push(id); + } + }); + scheduleFlush(); + } + // Tell the web worker to add a domain object's model to its list of items. function indexItem(domainObject) { - var message; - - // undefined check - if (domainObject && domainObject.getModel) { - // Using model instead of whole domain object because - // it's a JSON object. - message = { - request: 'index', - model: domainObject.getModel(), - id: domainObject.getId() - }; - worker.postMessage(message); + var model = domainObject.getModel(); + + worker.postMessage({ + request: 'index', + model: model, + id: domainObject.getId() + }); + + if (Array.isArray(model.composition)) { + scheduleIdsForIndexing(model.composition); } } - - // Handles responses from the web worker. Namely, the results of - // a search request. + // Handles responses from the web worker. Namely, the results of + // a search request. function handleResponse(event) { var ids = [], id; - - // If we have the results from a search + + // If we have the results from a search if (event.data.request === 'search') { // Convert the ids given from the web worker into domain objects for (id in event.data.results) { @@ -91,7 +108,7 @@ define( objectService.getObjects(ids).then(function (objects) { var searchResults = [], id; - + // Create searchResult objects for (id in objects) { searchResults.push({ @@ -100,8 +117,8 @@ define( score: event.data.results[id] }); } - - // Resove the promise corresponding to this + + // Resove the promise corresponding to this pendingQueries[event.data.timestamp].resolve({ hits: searchResults, total: event.data.total, @@ -110,83 +127,49 @@ define( }); } } - - // Helper function for getItems(). Indexes the tree. - function indexItems(nodes) { - nodes.forEach(function (node) { - var id = node && node.getId && node.getId(); - - // If we have already indexed this item, stop here - if (indexed[id]) { - return; - } - - // Index each item with the web worker - indexItem(node); - indexed[id] = true; - - - // If this node has children, index those - if (node && node.hasCapability && node.hasCapability('composition')) { - // Make sure that this is async, so doesn't block up page - $timeout(function () { - // Get the children... - node.useCapability('composition').then(function (children) { - $timeout(function () { - // ... then index the children - if (children.constructor === Array) { - indexItems(children); - } else { - indexItems([children]); - } - }, 0); - }); - }, 0); - } - - // Watch for changes to this item, in case it gets new children - if (node && node.hasCapability && node.hasCapability('mutation')) { - node.getCapability('mutation').listen(function (listener) { - if (listener && listener.composition) { - // If the node was mutated to have children, get the child domain objects - objectService.getObjects(listener.composition).then(function (objectsById) { - var objects = [], - id; - // Get each of the domain objects in objectsById - for (id in objectsById) { - objects.push(objectsById[id]); - } - - indexItems(objects); - }); - } - }); + function requestAndIndex(id) { + pendingRequests += 1; + objectService.getObjects([id]).then(function (objects) { + delete pendingIndex[id]; + if (objects[id]) { + indexItem(objects[id]); } + }, function () { + $log.warn("Failed to index domain object " + id); + }).then(function () { + pendingRequests -= 1; + scheduleFlush(); }); } - - // Converts the filetree into a list - function getItems() { - // Aquire root objects - objectService.getObjects(ROOTS).then(function (objectsById) { - var objects = [], - id; - - // Get each of the domain objects in objectsById - for (id in objectsById) { - objects.push(objectsById[id]); - } - - // Index all of the roots' descendents - indexItems(objects); - }); - } + + scheduleFlush = throttle(function flush() { + var batchSize = + Math.max(MAX_CONCURRENT_REQUESTS - pendingRequests, 0); + + if (toRequest.length + pendingRequests < 1) { + $log.info([ + 'GenericSearch finished indexing after ', + ((Date.now() - indexingStarted) / 1000).toFixed(2), + ' seconds.' + ].join('')); + } else { + toRequest.splice(-batchSize, batchSize) + .forEach(requestAndIndex); + } + }, FLUSH_INTERVAL); worker.onmessage = handleResponse; - // Index the tree's contents once at the beginning - getItems(); + // Index the tree's contents once at the beginning + scheduleIdsForIndexing(ROOTS); + + // Re-index items when they are mutated + mutationTopic.listen(function (domainObject) { + var id = domainObject.getId(); + indexed[id] = false; + scheduleIdsForIndexing([id]); + }); } /** @@ -266,4 +249,4 @@ define( return GenericSearchProvider; } -); \ No newline at end of file +); diff --git a/platform/search/test/services/GenericSearchProviderSpec.js b/platform/search/test/services/GenericSearchProviderSpec.js index 2da7cd343b..e3ee0a97ba 100644 --- a/platform/search/test/services/GenericSearchProviderSpec.js +++ b/platform/search/test/services/GenericSearchProviderSpec.js @@ -31,35 +31,67 @@ define( describe("The generic search provider ", function () { var mockQ, - mockTimeout, + mockLog, + mockThrottle, mockDeferred, mockObjectService, mockObjectPromise, + mockChainedPromise, mockDomainObjects, mockCapability, mockCapabilityPromise, mockWorkerService, mockWorker, + mockTopic, + mockMutationTopic, mockRoots = ['root1', 'root2'], + mockThrottledFn, + throttledCallCount, provider, mockProviderResults; - beforeEach(function () { + function resolveObjectPromises() { var i; - + for (i = 0; i < mockObjectPromise.then.calls.length; i += 1) { + mockChainedPromise.then.calls[i].args[0]( + mockObjectPromise.then.calls[i] + .args[0](mockDomainObjects) + ); + } + } + + function resolveThrottledFn() { + if (mockThrottledFn.calls.length > throttledCallCount) { + mockThrottle.mostRecentCall.args[0](); + throttledCallCount = mockThrottledFn.calls.length; + } + } + + function resolveAsyncTasks() { + resolveThrottledFn(); + resolveObjectPromises(); + } + + beforeEach(function () { mockQ = jasmine.createSpyObj( "$q", [ "defer" ] ); + mockLog = jasmine.createSpyObj( + "$log", + [ "error", "warn", "info", "debug" ] + ); mockDeferred = jasmine.createSpyObj( "deferred", [ "resolve", "reject"] ); mockDeferred.promise = "mock promise"; mockQ.defer.andReturn(mockDeferred); - - mockTimeout = jasmine.createSpy("$timeout"); - + + mockThrottle = jasmine.createSpy("throttle"); + mockThrottledFn = jasmine.createSpy("throttledFn"); + throttledCallCount = 0; + mockObjectService = jasmine.createSpyObj( "objectService", [ "getObjects" ] @@ -68,9 +100,14 @@ define( "promise", [ "then", "catch" ] ); + mockChainedPromise = jasmine.createSpyObj( + "chainedPromise", + [ "then" ] + ); mockObjectService.getObjects.andReturn(mockObjectPromise); - - + + mockTopic = jasmine.createSpy('topic'); + mockWorkerService = jasmine.createSpyObj( "workerService", [ "run" ] @@ -80,68 +117,109 @@ define( [ "postMessage" ] ); mockWorkerService.run.andReturn(mockWorker); - + mockCapabilityPromise = jasmine.createSpyObj( "promise", [ "then", "catch" ] ); - + mockDomainObjects = {}; - for (i = 0; i < 4; i += 1) { - mockDomainObjects[i] = ( + ['a', 'root1', 'root2'].forEach(function (id) { + mockDomainObjects[id] = ( jasmine.createSpyObj( "domainObject", - [ "getId", "getModel", "hasCapability", "getCapability", "useCapability" ] + [ + "getId", + "getModel", + "hasCapability", + "getCapability", + "useCapability" + ] ) ); - mockDomainObjects[i].getId.andReturn(i); - mockDomainObjects[i].getCapability.andReturn(mockCapability); - mockDomainObjects[i].useCapability.andReturn(mockCapabilityPromise); - } - // Give the first object children - mockDomainObjects[0].hasCapability.andReturn(true); + mockDomainObjects[id].getId.andReturn(id); + mockDomainObjects[id].getCapability.andReturn(mockCapability); + mockDomainObjects[id].useCapability.andReturn(mockCapabilityPromise); + mockDomainObjects[id].getModel.andReturn({}); + }); + mockCapability = jasmine.createSpyObj( "capability", [ "invoke", "listen" ] ); mockCapability.invoke.andReturn(mockCapabilityPromise); - mockDomainObjects[0].getCapability.andReturn(mockCapability); - - provider = new GenericSearchProvider(mockQ, mockTimeout, mockObjectService, mockWorkerService, mockRoots); + mockDomainObjects.a.getCapability.andReturn(mockCapability); + mockMutationTopic = jasmine.createSpyObj( + 'mutationTopic', + [ 'listen' ] + ); + mockTopic.andCallFake(function (key) { + return key === 'mutation' && mockMutationTopic; + }); + mockThrottle.andReturn(mockThrottledFn); + mockObjectPromise.then.andReturn(mockChainedPromise); + + provider = new GenericSearchProvider( + mockQ, + mockLog, + mockThrottle, + mockObjectService, + mockWorkerService, + mockTopic, + mockRoots + ); }); - + it("indexes tree on initialization", function () { + var i; + + resolveThrottledFn(); + expect(mockObjectService.getObjects).toHaveBeenCalled(); expect(mockObjectPromise.then).toHaveBeenCalled(); - - // Call through the root-getting part - mockObjectPromise.then.mostRecentCall.args[0](mockDomainObjects); - - // Call through the children-getting part - mockTimeout.mostRecentCall.args[0](); - // Array argument indicates multiple children - mockCapabilityPromise.then.mostRecentCall.args[0]([]); - mockTimeout.mostRecentCall.args[0](); - // Call again, but for single child - mockCapabilityPromise.then.mostRecentCall.args[0]({}); - mockTimeout.mostRecentCall.args[0](); - - expect(mockWorker.postMessage).toHaveBeenCalled(); + + // Call through the root-getting part + resolveObjectPromises(); + + mockRoots.forEach(function (id) { + expect(mockWorker.postMessage).toHaveBeenCalledWith({ + request: 'index', + model: mockDomainObjects[id].getModel(), + id: id + }); + }); }); - - it("when indexing, listens for composition changes", function () { - var mockListener = {composition: {}}; - - // Call indexItems - mockObjectPromise.then.mostRecentCall.args[0](mockDomainObjects); - - // Call through listening for changes - expect(mockCapability.listen).toHaveBeenCalled(); - mockCapability.listen.mostRecentCall.args[0](mockListener); - expect(mockObjectService.getObjects).toHaveBeenCalled(); - mockObjectPromise.then.mostRecentCall.args[0](mockDomainObjects); + + it("indexes members of composition", function () { + mockDomainObjects.root1.getModel.andReturn({ + composition: ['a'] + }); + + resolveAsyncTasks(); + resolveAsyncTasks(); + + expect(mockWorker.postMessage).toHaveBeenCalledWith({ + request: 'index', + model: mockDomainObjects.a.getModel(), + id: 'a' + }); }); - + + it("listens for changes to mutation", function () { + expect(mockMutationTopic.listen) + .toHaveBeenCalledWith(jasmine.any(Function)); + mockMutationTopic.listen.mostRecentCall + .args[0](mockDomainObjects.a); + + resolveAsyncTasks(); + + expect(mockWorker.postMessage).toHaveBeenCalledWith({ + request: 'index', + model: mockDomainObjects.a.getModel(), + id: mockDomainObjects.a.getId() + }); + }); + it("sends search queries to the worker", function () { var timestamp = Date.now(); provider.query(' test "query" ', timestamp, 1, 2); @@ -153,20 +231,20 @@ define( timeout: 2 }); }); - + it("gives an empty result for an empty query", function () { var timestamp = Date.now(), queryOutput; - + queryOutput = provider.query('', timestamp, 1, 2); expect(queryOutput.hits).toEqual([]); expect(queryOutput.total).toEqual(0); - + queryOutput = provider.query(); expect(queryOutput.hits).toEqual([]); expect(queryOutput.total).toEqual(0); }); - + it("handles responses from the worker", function () { var timestamp = Date.now(), event = { @@ -181,13 +259,35 @@ define( timestamp: timestamp } }; - + provider.query(' test "query" ', timestamp); mockWorker.onmessage(event); mockObjectPromise.then.mostRecentCall.args[0](mockDomainObjects); expect(mockDeferred.resolve).toHaveBeenCalled(); }); - + + it("warns when objects are unavailable", function () { + resolveAsyncTasks(); + expect(mockLog.warn).not.toHaveBeenCalled(); + mockChainedPromise.then.mostRecentCall.args[0]( + mockObjectPromise.then.mostRecentCall.args[1]() + ); + expect(mockLog.warn).toHaveBeenCalled(); + }); + + it("throttles the loading of objects to index", function () { + expect(mockObjectService.getObjects).not.toHaveBeenCalled(); + resolveThrottledFn(); + expect(mockObjectService.getObjects).toHaveBeenCalled(); + }); + + it("logs when all objects have been processed", function () { + expect(mockLog.info).not.toHaveBeenCalled(); + resolveAsyncTasks(); + resolveThrottledFn(); + expect(mockLog.info).toHaveBeenCalled(); + }); + }); } -); \ No newline at end of file +);