diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt deleted file mode 100644 index 36593eca76..0000000000 --- a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt +++ /dev/null @@ -1,35 +0,0 @@ -package net.corda.core.internal.notary - -import net.corda.core.concurrent.CordaFuture -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.SecureHash -import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.flows.NotaryError -import net.corda.core.identity.Party - -/** - * A service that records input states of the given transaction and provides conflict information - * if any of the inputs have already been used in another transaction. - */ -interface AsyncUniquenessProvider : UniquenessProvider { - /** Commits all input states of the given transaction. */ - fun commitAsync(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture - - /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */ - override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) { - val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references).get() - if (result is Result.Failure) { - throw NotaryInternalException(result.error) - } - } - - /** The outcome of committing a transaction. */ - sealed class Result { - /** Indicates that all input states have been committed successfully. */ - object Success : Result() - /** Indicates that the transaction has not been committed. */ - data class Failure(val error: NotaryError) : Result() - } -} - diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index 0dae619cd8..938039e03d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -6,9 +6,6 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.* import net.corda.core.identity.Party -import net.corda.core.internal.IdempotentFlow -import net.corda.core.internal.executeAsync -import net.corda.core.internal.notary.AsyncUniquenessProvider.Result import net.corda.core.utilities.unwrap /** @@ -20,7 +17,7 @@ import net.corda.core.utilities.unwrap * Additional transaction validation logic can be added when implementing [validateRequest]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? -abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic(), IdempotentFlow { +abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic() { companion object { // TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder. private const val maxAllowedInputsAndReferences = 10_000 @@ -37,7 +34,14 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: val parts = validateRequest(requestPayload) txId = parts.id checkNotary(parts.notary) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references) + service.commitInputStates( + parts.inputs, + txId, + otherSideSession.counterparty, + requestPayload.requestSignature, + parts.timestamp, + parts.references + ) signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) @@ -78,7 +82,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: @Suspendable private fun signTransactionAndSendResponse(txId: SecureHash) { - val signature = service.sign(txId) + val signature = service.signTransaction(txId) otherSideSession.send(NotarisationResponse(listOf(signature))) } diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt new file mode 100644 index 0000000000..56fe34a7ff --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt @@ -0,0 +1,86 @@ +package net.corda.core.internal.notary + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.identity.Party +import net.corda.core.internal.FlowAsyncOperation +import net.corda.core.internal.executeAsync +import net.corda.core.internal.notary.UniquenessProvider.Result +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.contextLogger +import org.slf4j.Logger + +/** Base implementation for a notary service operated by a singe party. */ +abstract class SinglePartyNotaryService : NotaryService() { + companion object { + private val staticLog = contextLogger() + } + + protected open val log: Logger get() = staticLog + + /** Handles input state uniqueness checks. */ + protected abstract val uniquenessProvider: UniquenessProvider + + /** Attempts to commit the specified transaction [txId]. */ + @Suspendable + open fun commitInputStates( + inputs: List, + txId: SecureHash, + caller: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?, + references: List + ) { + // TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance + // when we concurrently log requests here as part of the flows, instead of logging sequentially in the + // `UniquenessProvider`. + + val callingFlow = FlowLogic.currentTopLevel + ?: throw IllegalStateException("This method should be invoked in a flow context.") + val result = callingFlow.executeAsync( + CommitOperation( + this, + inputs, + txId, + caller, + requestSignature, + timeWindow, + references + ) + ) + if (result is UniquenessProvider.Result.Failure) { + throw NotaryInternalException(result.error) + } + } + + /** + * Required for the flow to be able to suspend until the commit is complete. + * This object will be included in the flow checkpoint. + */ + @CordaSerializable + class CommitOperation( + val service: SinglePartyNotaryService, + val inputs: List, + val txId: SecureHash, + val caller: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?, + val references: List + ) : FlowAsyncOperation { + + override fun execute(deduplicationId: String): CordaFuture { + return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references) + } + } + + /** Sign a single transaction. */ + fun signTransaction(txId: SecureHash): TransactionSignature { + val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) + return services.keyManagementService.sign(signableData, notaryIdentityKey) + } +} diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt deleted file mode 100644 index a9466b11b4..0000000000 --- a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt +++ /dev/null @@ -1,72 +0,0 @@ -package net.corda.core.internal.notary - -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.* -import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.flows.NotaryError -import net.corda.core.identity.Party -import net.corda.core.utilities.contextLogger -import org.slf4j.Logger - -/** - * A base notary service implementation that provides functionality for cases where a signature by a single member - * of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary. - */ -abstract class TrustedAuthorityNotaryService : NotaryService() { - companion object { - private val staticLog = contextLogger() - } - - protected open val log: Logger get() = staticLog - protected abstract val uniquenessProvider: UniquenessProvider - - /** - * @throws NotaryException if any of the states have been consumed by a different transaction. Note that - * this method does not throw an exception when input states are present multiple times within the transaction. - */ - @JvmOverloads - @Suspendable - open fun commitInputStates( - inputs: List, - txId: SecureHash, - caller: Party, - requestSignature: NotarisationRequestSignature, - timeWindow: TimeWindow?, - references: List = emptyList() - ) { - try { - uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references) - } catch (e: NotaryInternalException) { - if (e.error is NotaryError.Conflict) { - val allInputs = inputs + references - val conflicts = allInputs.filterIndexed { _, stateRef -> - val cause = e.error.consumedStates[stateRef] - cause != null && cause.hashOfTransactionId != txId.sha256() - } - if (conflicts.isNotEmpty()) { - // TODO: Create a new UniquenessException that only contains the conflicts filtered above. - log.info("Notary conflicts for $txId: $conflicts") - throw e - } - } else throw e - } catch (e: Exception) { - log.error("Internal error", e) - throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later"))) - } - } - - /** Sign a [ByteArray] input. */ - fun sign(bits: ByteArray): DigitalSignature.WithKey { - return services.keyManagementService.sign(bits, notaryIdentityKey) - } - - /** Sign a single transaction. */ - fun sign(txId: SecureHash): TransactionSignature { - val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) - return services.keyManagementService.sign(signableData, notaryIdentityKey) - } - - // TODO: Sign multiple transactions at once by building their Merkle tree and then signing over its root. -} diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt index 04d97915c6..aebf618af1 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt @@ -1,16 +1,16 @@ package net.corda.core.internal.notary +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError import net.corda.core.identity.Party /** * A service that records input states of the given transaction and provides conflict information * if any of the inputs have already been used in another transaction. - * - * A uniqueness provider is expected to be used from within the context of a flow. */ interface UniquenessProvider { /** Commits all input states of the given transaction. */ @@ -21,5 +21,13 @@ interface UniquenessProvider { requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow? = null, references: List = emptyList() - ) + ): CordaFuture + + /** The outcome of committing a transaction. */ + sealed class Result { + /** Indicates that all input states have been committed successfully. */ + object Success : Result() + /** Indicates that the transaction has not been committed. */ + data class Failure(val error: NotaryError) : Result() + } } \ No newline at end of file diff --git a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt index 97843cb13c..1b687b2ec3 100644 --- a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt +++ b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt @@ -1,8 +1,8 @@ package net.corda.notary.raft import net.corda.core.flows.FlowSession +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow @@ -13,7 +13,7 @@ import java.security.PublicKey class RaftNotaryService( override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey -) : TrustedAuthorityNotaryService() { +) : SinglePartyNotaryService() { private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present") diff --git a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt index f3b7671a3e..98bdbf4566 100644 --- a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt +++ b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftUniquenessProvider.kt @@ -13,13 +13,14 @@ import io.atomix.copycat.server.CopycatServer import io.atomix.copycat.server.cluster.Member import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party import net.corda.core.internal.NamedCacheFactory -import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken @@ -193,7 +194,7 @@ class RaftUniquenessProvider( requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ) { + ): CordaFuture { log.debug { "Attempting to commit input states: ${states.joinToString()}" } val commitCommand = CommitTransaction( states, @@ -203,10 +204,16 @@ class RaftUniquenessProvider( timeWindow, references ) - val commitError = client.submit(commitCommand).get() - if (commitError != null) throw NotaryInternalException(commitError) - log.debug { "All input states of transaction $txId have been committed" } + val future = openFuture() + client.submit(commitCommand).thenAccept { commitError -> + val result = if (commitError != null) { + UniquenessProvider.Result.Failure(commitError) + } else { + log.debug { "All input states of transaction $txId have been committed" } + UniquenessProvider.Result.Success + } + future.set(result) + } + return future } } - - diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt index eae8f429af..3f9e9bb8f5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt @@ -10,9 +10,8 @@ import net.corda.core.flows.* import net.corda.core.internal.* import net.corda.core.internal.cordapp.CordappImpl import net.corda.core.internal.cordapp.CordappInfoResolver -import net.corda.core.internal.notary.AsyncCFTNotaryService import net.corda.core.internal.notary.NotaryService -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.node.services.CordaService import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SerializationCustomSerializer @@ -154,8 +153,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: // the scanner won't find subclasses deeper down the hierarchy if any intermediate class is not // present in the CorDapp. val result = scanResult.getClassesWithSuperclass(NotaryService::class) + - scanResult.getClassesWithSuperclass(TrustedAuthorityNotaryService::class) + - scanResult.getClassesWithSuperclass(AsyncCFTNotaryService::class) + scanResult.getClassesWithSuperclass(SinglePartyNotaryService::class) logger.info("Found notary service CorDapp implementations: " + result.joinToString(", ")) return result.firstOrNull() } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index 38b17af15a..a1ce69da9d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -5,14 +5,14 @@ import net.corda.core.contracts.ComponentGroupEnum import net.corda.core.flows.FlowSession import net.corda.core.flows.NotarisationPayload import net.corda.core.flows.NotarisationRequest +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.transactions.ContractUpgradeFilteredTransaction import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.NotaryChangeWireTransaction -class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) { +class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { /** * The received transaction is not checked for contract-validity, as that would require fully * resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 122e10d3c3..9b82d98e89 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -12,7 +12,7 @@ import net.corda.core.identity.Party import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.isConsumedByTheSameTx import net.corda.core.internal.notary.validateTimeWindow @@ -36,7 +36,7 @@ import kotlin.concurrent.thread /** A RDBMS backed Uniqueness provider */ @ThreadSafe -class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() { +class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() { @MappedSuperclass class BaseComittedState( @@ -77,7 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste val requestSignature: NotarisationRequestSignature, val timeWindow: TimeWindow?, val references: List, - val future: OpenFuture) + val future: OpenFuture) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") @@ -133,15 +133,15 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste * * Returns a future that will complete once the request is processed, containing the commit [Result]. */ - override fun commitAsync( + override fun commit( states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ): CordaFuture { - val future = openFuture() + ): CordaFuture { + val future = openFuture() val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future) requestQueue.put(request) return future @@ -232,13 +232,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste private fun respondWithError(request: CommitRequest, exception: Exception) { if (exception is NotaryInternalException) { - request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) + request.future.set(UniquenessProvider.Result.Failure(exception.error)) } else { request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) } } private fun respondWithSuccess(request: CommitRequest) { - request.future.set(AsyncUniquenessProvider.Result.Success) + request.future.set(UniquenessProvider.Result.Success) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index b4f42baebe..d881625001 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -1,14 +1,14 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.schemas.MappedSchema import net.corda.node.services.api.ServiceHubInternal import java.security.PublicKey /** An embedded notary service that uses the node's database to store committed states. */ -class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { +class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { private val notaryConfig = services.configuration.notary ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index a4d4ea2fb0..e4b470d0a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -8,9 +8,9 @@ import net.corda.core.flows.NotarisationPayload import net.corda.core.flows.NotarisationRequest import net.corda.core.flows.NotaryError import net.corda.core.internal.ResolveTransactionsFlow +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction @@ -22,7 +22,7 @@ import java.security.SignatureException * has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was * indeed valid. */ -class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) { +class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { /** * Fully resolves the received transaction and its dependencies, runs contract verification logic and checks that * the transaction in question has all required signatures apart from the notary's. diff --git a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt index 97fb55379f..056ca6db4e 100644 --- a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name @@ -12,8 +15,9 @@ import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.node.NotaryInfo import net.corda.core.transactions.SignedTransaction @@ -52,19 +56,19 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() { lateinit var notary: Party lateinit var node: TestStartedNode - private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair { - val replicaIds = (0 until clusterSize) - val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH") - val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( - replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, - serviceLegalName) + private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair { + val replicaIds = (0 until clusterSize) + val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH") + val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( + replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, + serviceLegalName) - val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true)))) - val notaryConfig = mock { - whenever(it.serviceLegalName).thenReturn(serviceLegalName) - whenever(it.validating).thenReturn(true) - whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name) - } + val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true)))) + val notaryConfig = mock { + whenever(it.serviceLegalName).thenReturn(serviceLegalName) + whenever(it.validating).thenReturn(true) + whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name) + } val notaryNodes = (0 until clusterSize).map { mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = { @@ -128,7 +132,8 @@ class TimedFlowTests { /** The notary nodes don't run any consensus protocol, so 2 nodes are sufficient for the purpose of this test. */ private val globalRule = TimedFlowTestRule(2) - @ClassRule @JvmField + @ClassRule + @JvmField val ruleChain = RuleChain.outerRule(globalDatabaseRule).around(globalRule) } @@ -190,8 +195,16 @@ class TimedFlowTests { }.bufferUntilSubscribed().toBlocking().toFuture() } - class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = mock() + class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { + override val uniquenessProvider = object : UniquenessProvider { + /** A dummy commit method that immediately returns a success message. */ + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture { + return openFuture().apply { + set(UniquenessProvider.Result.Success) + } + } + } + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = TestNotaryFlow(otherPartySession, this) override fun start() {} override fun stop() {} diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index b8e63f4b13..11e7f953d5 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -7,7 +7,7 @@ import net.corda.core.crypto.sha256 import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.UniquenessProvider import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -24,7 +24,6 @@ import org.junit.Rule import org.junit.Test import java.time.Clock import kotlin.test.assertEquals -import kotlin.test.assertFailsWith class PersistentUniquenessProviderTests { @Rule @@ -51,9 +50,9 @@ class PersistentUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory()) - val inputState = generateStateRef() + val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity, requestSignature) + provider.commit(listOf(inputState), txID, identity, requestSignature).get() } @Test @@ -63,15 +62,12 @@ class PersistentUniquenessProviderTests { val inputs = listOf(inputState) val firstTxId = txID - provider.commit(inputs, firstTxId, identity, requestSignature) - - provider.commit(inputs, firstTxId, identity, requestSignature) + provider.commit(inputs, firstTxId, identity, requestSignature).get() val secondTxId = SecureHash.randomSHA256() - val ex = assertFailsWith { - provider.commit(inputs, secondTxId, identity, requestSignature) - } - val error = ex.error as NotaryError.Conflict + + val response: UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get() + val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict val conflictCause = error.consumedStates[inputState]!! assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index 51bc918242..4af9f4fb3e 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -7,7 +7,7 @@ import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction @@ -23,7 +23,7 @@ import java.security.SignatureException * The notary-related APIs might change in the future. */ // START 1 -class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { +class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 4d42440b28..63475720eb 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -31,7 +31,6 @@ import net.corda.node.VersionInfo import net.corda.node.internal.AbstractNode import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.NodeFlowManager -import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.StartedNodeServices @@ -42,6 +41,7 @@ import net.corda.node.services.keys.KeyManagementServiceInternal import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.MessagingService import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.EnterpriseNamedCacheFactory @@ -433,7 +433,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe } - fun createUnstartedNode(parameters: InternalMockNodeParameters = InternalMockNodeParameters()): MockNode { return createUnstartedNode(parameters, defaultFactory) } @@ -502,11 +501,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe "MockNetwork.runNetwork() should only be used when networkSendManuallyPumped == false. " + "You can use MockNetwork.waitQuiescent() to wait for all the nodes to process all the messages on their queues instead." } - fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) } if (rounds == -1) { - while (pumpAll().any { it != null }) { - } + do { + awaitAsyncOperations() + } while (pumpAll()) } else { repeat(rounds) { pumpAll() @@ -514,6 +513,32 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe } } + private fun pumpAll(): Boolean { + val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) } + return transferredMessages.any { it != null } + } + + /** + * We wait for any flows that are suspended on an async operation completion to resume and either + * finish the flow, or generate a response message. + */ + private fun awaitAsyncOperations() { + while (anyFlowsSuspendedOnAsyncOperation()) { + Thread.sleep(50) + } + } + + /** Returns true if there are any flows suspended waiting for an async operation to complete. */ + private fun anyFlowsSuspendedOnAsyncOperation(): Boolean { + val allNodes = this._nodes + val allActiveFlows = allNodes.flatMap { it.smm.snapshot() } + + return allActiveFlows.any { + val flowState = it.snapshot().checkpoint.flowState + flowState is FlowState.Started && flowState.flowIORequest is FlowIORequest.ExecuteAsyncOperation + } + } + @JvmOverloads fun createPartyNode(legalName: CordaX500Name? = null): TestStartedNode { return createNode(InternalMockNodeParameters(legalName = legalName))