[Search] Vary batch size

When indexing for generic search, issue new batches of
requests as individual requests finish (instead of waiting
for whole batches to finish) varying size to keep the
number of outstanding requests below some maximum.

nasa/openmctweb#141
This commit is contained in:
Victor Woeltjen 2015-09-29 18:35:55 -07:00
parent 85ac4a9a32
commit 78f3f8367e

View File

@ -31,8 +31,8 @@ define(
var DEFAULT_MAX_RESULTS = 100, var DEFAULT_MAX_RESULTS = 100,
DEFAULT_TIMEOUT = 1000, DEFAULT_TIMEOUT = 1000,
FLUSH_SIZE = 24, MAX_CONCURRENT_REQUESTS = 100,
FLUSH_INTERVAL = 25, FLUSH_INTERVAL = 180,
stopTime; stopTime;
/** /**
@ -53,10 +53,11 @@ define(
function GenericSearchProvider($q, $log, throttle, objectService, workerService, topic, ROOTS) { function GenericSearchProvider($q, $log, throttle, objectService, workerService, topic, ROOTS) {
var indexed = {}, var indexed = {},
pendingQueries = {}, pendingQueries = {},
toIndex = {}, toRequest = {},
worker = workerService.run('genericSearchWorker'), worker = workerService.run('genericSearchWorker'),
mutationTopic = topic("mutation"), mutationTopic = topic("mutation"),
indexingStarted = Date.now(), indexingStarted = Date.now(),
pendingRequests = 0,
scheduleFlush; scheduleFlush;
this.worker = worker; this.worker = worker;
@ -69,7 +70,7 @@ define(
ids.forEach(function (id) { ids.forEach(function (id) {
if (!indexed[id]) { if (!indexed[id]) {
indexed[id] = true; indexed[id] = true;
toIndex[id] = true; toRequest[id] = true;
} }
}); });
scheduleFlush(); scheduleFlush();
@ -125,32 +126,36 @@ define(
} }
} }
scheduleFlush = throttle(function flush() { function requestAndIndex(id) {
var ids = Object.keys(toIndex).slice(0, FLUSH_SIZE); delete toRequest[id];
pendingRequests += 1;
// Don't need to look these up next time objectService.getObjects([id]).then(function (objects) {
ids.forEach(function (id) { if (objects[id]) {
delete toIndex[id]; indexItem(objects[id]);
}
}, function () {
$log.warn("Failed to index domain object " + id);
}).then(function () {
pendingRequests -= 1;
scheduleFlush();
}); });
}
if (ids.length < 1) { scheduleFlush = throttle(function flush() {
var batchSize =
Math.max(MAX_CONCURRENT_REQUESTS - pendingRequests, 0);
if (Object.keys(toRequest).length + pendingRequests < 1) {
$log.info([ $log.info([
'GenericSearch finished indexing after ', 'GenericSearch finished indexing after ',
((Date.now() - indexingStarted) / 1000).toFixed(2), ((Date.now() - indexingStarted) / 1000).toFixed(2),
' seconds.' ' seconds.'
].join('')); ].join(''));
return; } else {
Object.keys(toRequest)
.slice(0, batchSize)
.forEach(requestAndIndex);
} }
objectService.getObjects(ids).then(function (objects) {
ids.map(function (id) {
return objects[id];
}).filter(function (object) {
return object;
}).forEach(indexItem);
scheduleFlush();
});
}, FLUSH_INTERVAL); }, FLUSH_INTERVAL);
worker.onmessage = handleResponse; worker.onmessage = handleResponse;