From 759419fcc146250f42b16c2cde9a817534c61a04 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 23 Aug 2018 11:06:14 +0100 Subject: [PATCH] ENT-2355 Insert transactions without checking for duplicates (#1350) * ENT-2355 Make transactions storage optimistic if in a flow and the flow has never restored from a checkpoint. (cherry picked from commit 9a2e9b0) * ENT-2355 Insert transactions with optimism. * Added some comments. --- .../services/persistence/DBTransactionStorage.kt | 12 ++++++++++-- .../services/statemachine/FlowStateMachineImpl.kt | 3 +++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 3b8246c47f..dcb171c820 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -101,8 +101,16 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers override fun addTransaction(transaction: SignedTransaction): Boolean = database.transaction { txStorage.concurrent { - addWithDuplicatesAllowed(transaction.id, transaction.toTxCacheValue()).apply { - updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction) + // Be optimistic if we are a flow and never restarted (i.e. cannot have been to the flow hospital due to primary key collision). + val optimistic = FlowStateMachineImpl.currentStateMachine()?.isNotRestarted ?: false + if (optimistic) { + set(transaction.id, transaction.toTxCacheValue()).apply { + updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction) + } + } else { + addWithDuplicatesAllowed(transaction.id, transaction.toTxCacheValue()).apply { + updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction) + } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index a5d48cb57e..ba73c6eedf 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -87,6 +87,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, internal var transientValues: TransientReference? = null internal var transientState: TransientReference? = null + // Utilise the behaviour of `ourSenderUUID` to indicate whether we have restarted ever. Can be used to make certain code paths optimistic. + internal val isNotRestarted: Boolean get() = (ourSenderUUID != null) + /** * What sender identifier to put on messages sent by this flow. This will either be the identifier for the current * state machine manager / messaging client, or null to indicate this flow is restored from a checkpoint and