From 6b43256afd2fb8655feb4fb445f13549abb4c65c Mon Sep 17 00:00:00 2001 From: Victor Woeltjen Date: Tue, 24 Mar 2015 14:09:51 -0700 Subject: [PATCH] [Persistence] Requeue on overwrite Requeue (instead of trying to access persistence again) on overwrite WTD-1033. --- .../queue/src/PersistenceFailureHandler.js | 13 ++++------ .../queue/src/PersistenceQueueHandler.js | 24 +++++++++++++------ .../queue/src/PersistenceQueueImpl.js | 18 +++++++------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/platform/persistence/queue/src/PersistenceFailureHandler.js b/platform/persistence/queue/src/PersistenceFailureHandler.js index 3efde3d195..9175b2f85e 100644 --- a/platform/persistence/queue/src/PersistenceFailureHandler.js +++ b/platform/persistence/queue/src/PersistenceFailureHandler.js @@ -16,14 +16,11 @@ define( // Issue a new persist call for the domain object associated with // this failure. function persist(failure) { - var decoratedPersistence = - failure.domainObject.getCapability('persistence'); - // Note that we issue the persist request here, but don't - // return it. We trust that the PersistenceQueue will - // behave correctly on the next round of flushing. - if (decoratedPersistence) { - decoratedPersistence.persist(); - } + // Note that we reissue the persist request here, but don't + // return it, to avoid a circular wait. We trust that the + // PersistenceQueue will behave correctly on the next round + // of flushing. + failure.requeue(); } // Retry persistence (overwrite) for this set of failed attempts diff --git a/platform/persistence/queue/src/PersistenceQueueHandler.js b/platform/persistence/queue/src/PersistenceQueueHandler.js index 09976b7ff0..d56f04b686 100644 --- a/platform/persistence/queue/src/PersistenceQueueHandler.js +++ b/platform/persistence/queue/src/PersistenceQueueHandler.js @@ -17,14 +17,21 @@ define( function PersistenceQueueHandler($q, failureHandler) { // Handle a group of persistence invocations - function persistGroup(ids, queue, domainObjects) { + function persistGroup(ids, persistences, domainObjects, queue) { var failures = []; // Try to persist a specific domain object function tryPersist(id) { // Look up its persistence capability from the provided // id->persistence object. - var persistence = queue[id]; + var persistence = persistences[id], + domainObject = domainObjects[id]; + + // Put a domain object back in the queue + // (e.g. after Overwrite) + function requeue() { + return queue.put(domainObject, persistence); + } // Handle success function succeed(value) { @@ -36,7 +43,8 @@ define( failures.push({ id: id, persistence: persistence, - domainObject: domainObjects[id], + domainObject: domainObject, + requeue: requeue, error: error }); return false; @@ -63,14 +71,16 @@ define( /** * Invoke the persist method on the provided persistence * capabilities. - * @param {Object.} queue + * @param {Object.} persistences * capabilities to invoke, in id->capability pairs. * @param {Object.} domainObjects * associated domain objects, in id->object pairs. + * @param {PersistenceQueue} queue the persistence queue, + * to requeue as necessary */ - persist: function (queue, domainObjects) { - var ids = Object.keys(queue); - return persistGroup(ids, queue, domainObjects); + persist: function (persistences, domainObjects, queue) { + var ids = Object.keys(persistences); + return persistGroup(ids, persistences, domainObjects, queue); } }; } diff --git a/platform/persistence/queue/src/PersistenceQueueImpl.js b/platform/persistence/queue/src/PersistenceQueueImpl.js index 94393bf2e1..fdc68e6725 100644 --- a/platform/persistence/queue/src/PersistenceQueueImpl.js +++ b/platform/persistence/queue/src/PersistenceQueueImpl.js @@ -22,7 +22,8 @@ define( * attempts to flush the queue */ function PersistenceQueueImpl($q, $timeout, handler, DELAY) { - var queue = {}, + var self, + persistences = {}, objects = {}, lastObservedSize = 0, pendingTimeout, @@ -31,10 +32,9 @@ define( // Check if the queue's size has stopped increasing) function quiescent() { - return Object.keys(queue).length === lastObservedSize; + return Object.keys(persistences).length === lastObservedSize; } - // Persist all queued objects function flush() { // Get a local reference to the active promise; @@ -51,11 +51,11 @@ define( } // Persist all queued objects - flushPromise = handler.persist(queue, objects) + flushPromise = handler.persist(persistences, objects, self) .then(clearFlushPromise, clearFlushPromise); // Reset queue, etc. - queue = {}; + persistences = {}; objects = {}; lastObservedSize = 0; pendingTimeout = undefined; @@ -71,7 +71,7 @@ define( // Only flush when we've stopped receiving updates (quiescent() ? flush : scheduleFlush)(); // Update lastObservedSize to detect quiescence - lastObservedSize = Object.keys(queue).length; + lastObservedSize = Object.keys(persistences).length; } // If we are already flushing the queue... @@ -91,7 +91,7 @@ define( // If no delay is provided, use a default DELAY = DELAY || 0; - return { + self = { /** * Queue persistence of a domain object. * @param {DomainObject} domainObject the domain object @@ -100,11 +100,13 @@ define( */ put: function (domainObject, persistence) { var id = domainObject.getId(); - queue[id] = persistence; + persistences[id] = persistence; objects[id] = domainObject; return scheduleFlush(); } }; + + return self; } return PersistenceQueueImpl;