From 66116e8d20f8df2d3dfc442b08ccf8deb01b576e Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Thu, 1 Nov 2018 19:03:43 +0000 Subject: [PATCH] ENT-1858: Notary cleanup (#4134) * Migrated all non-BFT notary implementations to use async commits. * Mock network: await for async operation completion. When calling runNetwork() it keeps "pumping" messages between participants until no more messages are generated. The problem comes in when a flow suspends on an async operation: the mock network thinks the flow finished the work for the current step, and since no more messages are generated, completes the runNetwork() function. The message that the flow generates once it resumes after async operation completion never gets processed. This change makes runNetwork() wait until all flow async operations finish, and only then check whether no more messages can be transferred. --- .../notary/AsyncUniquenessProvider.kt | 35 -------- .../core/internal/notary/NotaryServiceFlow.kt | 13 ++- .../notary/SinglePartyNotaryService.kt | 86 +++++++++++++++++++ .../notary/TrustedAuthorityNotaryService.kt | 70 --------------- .../internal/notary/UniquenessProvider.kt | 14 ++- .../corda/notary/raft/RaftNotaryService.kt | 4 +- .../notary/raft/RaftUniquenessProvider.kt | 21 +++-- .../cordapp/JarScanningCordappLoader.kt | 4 +- .../transactions/NonValidatingNotaryFlow.kt | 4 +- .../PersistentUniquenessProvider.kt | 16 ++-- .../transactions/SimpleNotaryService.kt | 4 +- .../transactions/ValidatingNotaryFlow.kt | 4 +- .../net/corda/node/services/TimedFlowTests.kt | 17 +++- .../PersistentUniquenessProviderTests.kt | 30 +++---- .../corda/notarydemo/MyCustomNotaryService.kt | 4 +- .../node/internal/InternalMockNetwork.kt | 35 ++++++-- 16 files changed, 200 insertions(+), 161 deletions(-) delete mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt delete mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt deleted file mode 100644 index 1d9328de88..0000000000 --- a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt +++ /dev/null @@ -1,35 +0,0 @@ -package net.corda.core.internal.notary - -import net.corda.core.concurrent.CordaFuture -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.SecureHash -import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.flows.NotaryError -import net.corda.core.identity.Party - -/** - * A service that records input states of the given transaction and provides conflict information - * if any of the inputs have already been used in another transaction. - */ -interface AsyncUniquenessProvider : UniquenessProvider { - /** Commits all input states of the given transaction. */ - fun commitAsync(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture - - /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */ - override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) { - val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get() - if (result is Result.Failure) { - throw NotaryInternalException(result.error) - } - } - - /** The outcome of committing a transaction. */ - sealed class Result { - /** Indicates that all input states have been committed successfully. */ - object Success : Result() - /** Indicates that the transaction has not been committed. */ - data class Failure(val error: NotaryError) : Result() - } -} - diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index b6e6f49865..bcd2e8d0a4 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -17,7 +17,7 @@ import net.corda.core.utilities.unwrap * Additional transaction validation logic can be added when implementing [validateRequest]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? -abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic() { +abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic() { companion object { // TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder. private const val maxAllowedInputsAndReferences = 10_000 @@ -34,7 +34,14 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: val parts = validateRequest(requestPayload) txId = parts.id checkNotary(parts.notary) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references) + service.commitInputStates( + parts.inputs, + txId, + otherSideSession.counterparty, + requestPayload.requestSignature, + parts.timestamp, + parts.references + ) signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) @@ -75,7 +82,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: @Suspendable private fun signTransactionAndSendResponse(txId: SecureHash) { - val signature = service.sign(txId) + val signature = service.signTransaction(txId) otherSideSession.send(NotarisationResponse(listOf(signature))) } diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt new file mode 100644 index 0000000000..56fe34a7ff --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt @@ -0,0 +1,86 @@ +package net.corda.core.internal.notary + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.identity.Party +import net.corda.core.internal.FlowAsyncOperation +import net.corda.core.internal.executeAsync +import net.corda.core.internal.notary.UniquenessProvider.Result +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.contextLogger +import org.slf4j.Logger + +/** Base implementation for a notary service operated by a singe party. */ +abstract class SinglePartyNotaryService : NotaryService() { + companion object { + private val staticLog = contextLogger() + } + + protected open val log: Logger get() = staticLog + + /** Handles input state uniqueness checks. */ + protected abstract val uniquenessProvider: UniquenessProvider + + /** Attempts to commit the specified transaction [txId]. */ + @Suspendable + open fun commitInputStates( + inputs: List, + txId: SecureHash, + caller: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?, + references: List + ) { + // TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance + // when we concurrently log requests here as part of the flows, instead of logging sequentially in the + // `UniquenessProvider`. + + val callingFlow = FlowLogic.currentTopLevel + ?: throw IllegalStateException("This method should be invoked in a flow context.") + val result = callingFlow.executeAsync( + CommitOperation( + this, + inputs, + txId, + caller, + requestSignature, + timeWindow, + references + ) + ) + if (result is UniquenessProvider.Result.Failure) { + throw NotaryInternalException(result.error) + } + } + + /** + * Required for the flow to be able to suspend until the commit is complete. + * This object will be included in the flow checkpoint. + */ + @CordaSerializable + class CommitOperation( + val service: SinglePartyNotaryService, + val inputs: List, + val txId: SecureHash, + val caller: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?, + val references: List + ) : FlowAsyncOperation { + + override fun execute(deduplicationId: String): CordaFuture { + return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references) + } + } + + /** Sign a single transaction. */ + fun signTransaction(txId: SecureHash): TransactionSignature { + val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) + return services.keyManagementService.sign(signableData, notaryIdentityKey) + } +} diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt deleted file mode 100644 index 148b506e82..0000000000 --- a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt +++ /dev/null @@ -1,70 +0,0 @@ -package net.corda.core.internal.notary - -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.* -import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.flows.NotaryError -import net.corda.core.identity.Party -import net.corda.core.utilities.contextLogger -import org.slf4j.Logger - -/** - * A base notary service implementation that provides functionality for cases where a signature by a single member - * of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary. - */ -abstract class TrustedAuthorityNotaryService : NotaryService() { - companion object { - private val staticLog = contextLogger() - } - - protected open val log: Logger get() = staticLog - protected abstract val uniquenessProvider: UniquenessProvider - - /** - * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that - * this method does not throw an exception when input states are present multiple times within the transaction. - */ - @JvmOverloads - fun commitInputStates( - inputs: List, - txId: SecureHash, - caller: Party, - requestSignature: NotarisationRequestSignature, - timeWindow: TimeWindow?, - references: List = emptyList() - ) { - try { - uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references) - } catch (e: NotaryInternalException) { - if (e.error is NotaryError.Conflict) { - val allInputs = inputs + references - val conflicts = allInputs.filterIndexed { _, stateRef -> - val cause = e.error.consumedStates[stateRef] - cause != null && cause.hashOfTransactionId != txId.sha256() - } - if (conflicts.isNotEmpty()) { - // TODO: Create a new UniquenessException that only contains the conflicts filtered above. - log.info("Notary conflicts for $txId: $conflicts") - throw e - } - } else throw e - } catch (e: Exception) { - log.error("Internal error", e) - throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later"))) - } - } - - /** Sign a [ByteArray] input. */ - fun sign(bits: ByteArray): DigitalSignature.WithKey { - return services.keyManagementService.sign(bits, notaryIdentityKey) - } - - /** Sign a single transaction. */ - fun sign(txId: SecureHash): TransactionSignature { - val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) - return services.keyManagementService.sign(signableData, notaryIdentityKey) - } - - // TODO: Sign multiple transactions at once by building their Merkle tree and then signing over its root. -} diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt index 04d97915c6..aebf618af1 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt @@ -1,16 +1,16 @@ package net.corda.core.internal.notary +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError import net.corda.core.identity.Party /** * A service that records input states of the given transaction and provides conflict information * if any of the inputs have already been used in another transaction. - * - * A uniqueness provider is expected to be used from within the context of a flow. */ interface UniquenessProvider { /** Commits all input states of the given transaction. */ @@ -21,5 +21,13 @@ interface UniquenessProvider { requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow? = null, references: List = emptyList() - ) + ): CordaFuture + + /** The outcome of committing a transaction. */ + sealed class Result { + /** Indicates that all input states have been committed successfully. */ + object Success : Result() + /** Indicates that the transaction has not been committed. */ + data class Failure(val error: NotaryError) : Result() + } } \ No newline at end of file diff --git a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt index 97843cb13c..1b687b2ec3 100644 --- a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt +++ b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt @@ -1,8 +1,8 @@ package net.corda.notary.raft import net.corda.core.flows.FlowSession +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow @@ -13,7 +13,7 @@ import java.security.PublicKey class RaftNotaryService( override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey -) : TrustedAuthorityNotaryService() { +) : SinglePartyNotaryService() { private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present") diff --git a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt index f3b7671a3e..98bdbf4566 100644 --- a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt +++ b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt @@ -13,13 +13,14 @@ import io.atomix.copycat.server.CopycatServer import io.atomix.copycat.server.cluster.Member import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party import net.corda.core.internal.NamedCacheFactory -import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken @@ -193,7 +194,7 @@ class RaftUniquenessProvider( requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ) { + ): CordaFuture { log.debug { "Attempting to commit input states: ${states.joinToString()}" } val commitCommand = CommitTransaction( states, @@ -203,10 +204,16 @@ class RaftUniquenessProvider( timeWindow, references ) - val commitError = client.submit(commitCommand).get() - if (commitError != null) throw NotaryInternalException(commitError) - log.debug { "All input states of transaction $txId have been committed" } + val future = openFuture() + client.submit(commitCommand).thenAccept { commitError -> + val result = if (commitError != null) { + UniquenessProvider.Result.Failure(commitError) + } else { + log.debug { "All input states of transaction $txId have been committed" } + UniquenessProvider.Result.Success + } + future.set(result) + } + return future } } - - diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt index c27df16f0f..01522caef5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt @@ -10,7 +10,7 @@ import net.corda.core.internal.* import net.corda.core.internal.cordapp.CordappImpl import net.corda.core.internal.cordapp.CordappInfoResolver import net.corda.core.internal.notary.NotaryService -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.node.services.CordaService import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SerializationCustomSerializer @@ -152,7 +152,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: // the scanner won't find subclasses deeper down the hierarchy if any intermediate class is not // present in the CorDapp. val result = scanResult.getClassesWithSuperclass(NotaryService::class) + - scanResult.getClassesWithSuperclass(TrustedAuthorityNotaryService::class) + scanResult.getClassesWithSuperclass(SinglePartyNotaryService::class) logger.info("Found notary service CorDapp implementations: " + result.joinToString(", ")) return result.firstOrNull() } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index 38b17af15a..a1ce69da9d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -5,14 +5,14 @@ import net.corda.core.contracts.ComponentGroupEnum import net.corda.core.flows.FlowSession import net.corda.core.flows.NotarisationPayload import net.corda.core.flows.NotarisationRequest +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.transactions.ContractUpgradeFilteredTransaction import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.NotaryChangeWireTransaction -class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) { +class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { /** * The received transaction is not checked for contract-validity, as that would require fully * resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 122e10d3c3..9b82d98e89 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -12,7 +12,7 @@ import net.corda.core.identity.Party import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.isConsumedByTheSameTx import net.corda.core.internal.notary.validateTimeWindow @@ -36,7 +36,7 @@ import kotlin.concurrent.thread /** A RDBMS backed Uniqueness provider */ @ThreadSafe -class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() { +class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() { @MappedSuperclass class BaseComittedState( @@ -77,7 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste val requestSignature: NotarisationRequestSignature, val timeWindow: TimeWindow?, val references: List, - val future: OpenFuture) + val future: OpenFuture) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") @@ -133,15 +133,15 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste * * Returns a future that will complete once the request is processed, containing the commit [Result]. */ - override fun commitAsync( + override fun commit( states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ): CordaFuture { - val future = openFuture() + ): CordaFuture { + val future = openFuture() val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future) requestQueue.put(request) return future @@ -232,13 +232,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste private fun respondWithError(request: CommitRequest, exception: Exception) { if (exception is NotaryInternalException) { - request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) + request.future.set(UniquenessProvider.Result.Failure(exception.error)) } else { request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) } } private fun respondWithSuccess(request: CommitRequest) { - request.future.set(AsyncUniquenessProvider.Result.Success) + request.future.set(UniquenessProvider.Result.Success) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index d382b2feae..8efcbdc930 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -1,14 +1,14 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.schemas.MappedSchema import net.corda.node.services.api.ServiceHubInternal import java.security.PublicKey /** An embedded notary service that uses the node's database to store committed states. */ -class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { +class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index a4d4ea2fb0..e4b470d0a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -8,9 +8,9 @@ import net.corda.core.flows.NotarisationPayload import net.corda.core.flows.NotarisationRequest import net.corda.core.flows.NotaryError import net.corda.core.internal.ResolveTransactionsFlow +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction @@ -22,7 +22,7 @@ import java.security.SignatureException * has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was * indeed valid. */ -class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) { +class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { /** * Fully resolves the received transaction and its dependencies, runs contract verification logic and checks that * the transaction in question has all required signatures apart from the notary's. diff --git a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt index 86c6283f62..a2c3e71365 100644 --- a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name @@ -12,8 +15,9 @@ import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.node.NotaryInfo import net.corda.core.transactions.SignedTransaction @@ -176,8 +180,15 @@ class TimedFlowTests { }.bufferUntilSubscribed().toBlocking().toFuture() } - private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = mock() + private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { + override val uniquenessProvider = object : UniquenessProvider { + /** A dummy commit method that immediately returns a success message. */ + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture { + return openFuture(). apply { + set(UniquenessProvider.Result.Success) + } + } + } override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = TestNotaryFlow(otherPartySession, this) override fun start() {} override fun stop() {} diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index 3c63c71847..daa8132459 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -8,14 +8,15 @@ import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.identity.CordaX500Name import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.UniquenessProvider import net.corda.node.services.schema.NodeSchemaService -import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.core.generateStateRef import net.corda.testing.internal.LogHelper +import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After @@ -51,27 +52,26 @@ class PersistentUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory()) - val inputState = generateStateRef() + val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity, requestSignature) + provider.commit(listOf(inputState), txID, identity, requestSignature).get() } @Test fun `should report a conflict for a transaction with previously used inputs`() { val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory()) - val inputState = generateStateRef() + val inputState = generateStateRef() - val inputs = listOf(inputState) - val firstTxId = txID - provider.commit(inputs, firstTxId, identity, requestSignature) + val inputs = listOf(inputState) + val firstTxId = txID + provider.commit(inputs, firstTxId, identity, requestSignature).get() - val secondTxId = SecureHash.randomSHA256() - val ex = assertFailsWith { - provider.commit(inputs, secondTxId, identity, requestSignature) - } - val error = ex.error as NotaryError.Conflict + val secondTxId = SecureHash.randomSHA256() - val conflictCause = error.consumedStates[inputState]!! - assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) - } + val response:UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get() + val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict + + val conflictCause = error.consumedStates[inputState]!! + assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) + } } diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index 51bc918242..4af9f4fb3e 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -7,7 +7,7 @@ import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction @@ -23,7 +23,7 @@ import java.security.SignatureException * The notary-related APIs might change in the future. */ // START 1 -class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { +class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index f280cee72d..fe30386b6c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -31,7 +31,6 @@ import net.corda.node.VersionInfo import net.corda.node.internal.AbstractNode import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.NodeFlowManager -import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.StartedNodeServices @@ -45,6 +44,7 @@ import net.corda.node.services.keys.KeyManagementServiceInternal import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.MessagingService import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.DefaultNamedCacheFactory @@ -436,7 +436,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe } - fun createUnstartedNode(parameters: InternalMockNodeParameters = InternalMockNodeParameters()): MockNode { return createUnstartedNode(parameters, defaultFactory) } @@ -504,11 +503,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe "MockNetwork.runNetwork() should only be used when networkSendManuallyPumped == false. " + "You can use MockNetwork.waitQuiescent() to wait for all the nodes to process all the messages on their queues instead." } - fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) } if (rounds == -1) { - while (pumpAll().any { it != null }) { - } + do { + awaitAsyncOperations() + } while (pumpAll()) } else { repeat(rounds) { pumpAll() @@ -516,6 +515,32 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe } } + private fun pumpAll(): Boolean { + val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) } + return transferredMessages.any { it != null } + } + + /** + * We wait for any flows that are suspended on an async operation completion to resume and either + * finish the flow, or generate a response message. + */ + private fun awaitAsyncOperations() { + while (anyFlowsSuspendedOnAsyncOperation()) { + Thread.sleep(50) + } + } + + /** Returns true if there are any flows suspended waiting for an async operation to complete. */ + private fun anyFlowsSuspendedOnAsyncOperation(): Boolean { + val allNodes = this._nodes + val allActiveFlows = allNodes.flatMap { it.smm.snapshot() } + + return allActiveFlows.any { + val flowState = it.snapshot().checkpoint.flowState + flowState is FlowState.Started && flowState.flowIORequest is FlowIORequest.ExecuteAsyncOperation + } + } + @JvmOverloads fun createPartyNode(legalName: CordaX500Name? = null): TestStartedNode { return createNode(InternalMockNodeParameters(legalName = legalName))