mirror of
https://github.com/nasa/openmct.git
synced 2025-06-15 05:38:12 +00:00
[Persistence] Begin integrating persistence queue
Begin integrating persistence queue, tweaking for issues detected through minimal use. WTD-1033.
This commit is contained in:
@ -13,6 +13,7 @@
|
|||||||
"platform/features/scrolling",
|
"platform/features/scrolling",
|
||||||
"platform/forms",
|
"platform/forms",
|
||||||
"platform/persistence/cache",
|
"platform/persistence/cache",
|
||||||
|
"platform/persistence/queue",
|
||||||
"platform/persistence/elastic",
|
"platform/persistence/elastic",
|
||||||
|
|
||||||
"example/generator"
|
"example/generator"
|
||||||
|
@ -38,8 +38,8 @@ define(
|
|||||||
data: value
|
data: value
|
||||||
}).then(function (response) {
|
}).then(function (response) {
|
||||||
return response.data;
|
return response.data;
|
||||||
}, function () {
|
}, function (response) {
|
||||||
return undefined;
|
return (response || {}).data;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ define(
|
|||||||
function checkResponse(response, key) {
|
function checkResponse(response, key) {
|
||||||
var error;
|
var error;
|
||||||
if (response && !response.error) {
|
if (response && !response.error) {
|
||||||
revs[ID] = response[REV];
|
revs[key] = response[REV];
|
||||||
return response;
|
return response;
|
||||||
} else {
|
} else {
|
||||||
return handleError(response, key);
|
return handleError(response, key);
|
||||||
|
@ -39,6 +39,7 @@ define(
|
|||||||
) {
|
) {
|
||||||
// Wire up injected dependencies
|
// Wire up injected dependencies
|
||||||
return new PersistenceQueueImpl(
|
return new PersistenceQueueImpl(
|
||||||
|
$q,
|
||||||
$timeout,
|
$timeout,
|
||||||
new PersistenceQueueHandler(
|
new PersistenceQueueHandler(
|
||||||
$q,
|
$q,
|
||||||
|
@ -21,12 +21,13 @@ define(
|
|||||||
* @param {number} [DELAY] optional; delay in milliseconds between
|
* @param {number} [DELAY] optional; delay in milliseconds between
|
||||||
* attempts to flush the queue
|
* attempts to flush the queue
|
||||||
*/
|
*/
|
||||||
function PersistenceQueueImpl($timeout, handler, DELAY) {
|
function PersistenceQueueImpl($q, $timeout, handler, DELAY) {
|
||||||
var queue = {},
|
var queue = {},
|
||||||
objects = {},
|
objects = {},
|
||||||
lastObservedSize = 0,
|
lastObservedSize = 0,
|
||||||
pendingTimeout,
|
pendingTimeout,
|
||||||
flushPromise;
|
flushPromise,
|
||||||
|
activeDefer = $q.defer();
|
||||||
|
|
||||||
// Check if the queue's size has stopped increasing)
|
// Check if the queue's size has stopped increasing)
|
||||||
function quiescent() {
|
function quiescent() {
|
||||||
@ -39,8 +40,9 @@ define(
|
|||||||
flushPromise = handler.persist(queue, objects);
|
flushPromise = handler.persist(queue, objects);
|
||||||
|
|
||||||
// When persisted, clear the active promise
|
// When persisted, clear the active promise
|
||||||
flushPromise.then(function () {
|
flushPromise.then(function (value) {
|
||||||
flushPromise = undefined;
|
flushPromise = undefined;
|
||||||
|
activeDefer.resolve(value);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Reset queue, etc.
|
// Reset queue, etc.
|
||||||
@ -48,14 +50,19 @@ define(
|
|||||||
objects = {};
|
objects = {};
|
||||||
lastObservedSize = 0;
|
lastObservedSize = 0;
|
||||||
pendingTimeout = undefined;
|
pendingTimeout = undefined;
|
||||||
|
activeDefer = $q.defer();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule a flushing of the queue (that is, plan to flush
|
// Schedule a flushing of the queue (that is, plan to flush
|
||||||
// all objects in the queue)
|
// all objects in the queue)
|
||||||
function scheduleFlush() {
|
function scheduleFlush() {
|
||||||
function maybeFlush() {
|
function maybeFlush() {
|
||||||
|
// Timeout fired, so clear it
|
||||||
|
pendingTimeout = undefined;
|
||||||
// Only flush when we've stopped receiving updates
|
// Only flush when we've stopped receiving updates
|
||||||
(quiescent() ? flush : scheduleFlush)();
|
(quiescent() ? flush : scheduleFlush)();
|
||||||
|
// Update lastObservedSize to detect quiescence
|
||||||
|
lastObservedSize = Object.keys(queue).length;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are already flushing the queue...
|
// If we are already flushing the queue...
|
||||||
@ -68,6 +75,8 @@ define(
|
|||||||
pendingTimeout = pendingTimeout ||
|
pendingTimeout = pendingTimeout ||
|
||||||
$timeout(maybeFlush, DELAY, false);
|
$timeout(maybeFlush, DELAY, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return activeDefer.promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no delay is provided, use a default
|
// If no delay is provided, use a default
|
||||||
@ -84,7 +93,7 @@ define(
|
|||||||
var id = domainObject.getId();
|
var id = domainObject.getId();
|
||||||
queue[id] = persistence;
|
queue[id] = persistence;
|
||||||
objects[id] = domainObject;
|
objects[id] = domainObject;
|
||||||
scheduleFlush();
|
return scheduleFlush();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -15,12 +15,11 @@ define(
|
|||||||
* the capability
|
* the capability
|
||||||
*/
|
*/
|
||||||
function QueuingPersistenceCapability(queue, persistence, domainObject) {
|
function QueuingPersistenceCapability(queue, persistence, domainObject) {
|
||||||
var queuingPersistence = Object.create(persistence),
|
var queuingPersistence = Object.create(persistence);
|
||||||
id = domainObject.getId();
|
|
||||||
|
|
||||||
// Override persist calls to queue them instead
|
// Override persist calls to queue them instead
|
||||||
queuingPersistence.persist = function () {
|
queuingPersistence.persist = function () {
|
||||||
queue.put(id, persistence);
|
return queue.put(domainObject, persistence);
|
||||||
};
|
};
|
||||||
|
|
||||||
return queuingPersistence;
|
return queuingPersistence;
|
||||||
|
@ -43,8 +43,9 @@ define(
|
|||||||
}
|
}
|
||||||
|
|
||||||
function getCapabilities(model) {
|
function getCapabilities(model) {
|
||||||
return capabilityService.getCapabilities(model)
|
return decoratePersistence(
|
||||||
.then(decoratePersistence);
|
capabilityService.getCapabilities(model)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
Reference in New Issue
Block a user