mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
ENT-1858: Notary cleanup (#4134)
* Migrated all non-BFT notary implementations to use async commits. * Mock network: await for async operation completion. When calling runNetwork() it keeps "pumping" messages between participants until no more messages are generated. The problem comes in when a flow suspends on an async operation: the mock network thinks the flow finished the work for the current step, and since no more messages are generated, completes the runNetwork() function. The message that the flow generates once it resumes after async operation completion never gets processed. This change makes runNetwork() wait until all flow async operations finish, and only then check whether no more messages can be transferred.
This commit is contained in:
parent
4e0a956e20
commit
66116e8d20
@ -1,35 +0,0 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
* if any of the inputs have already been used in another transaction.
|
||||
*/
|
||||
interface AsyncUniquenessProvider : UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<Result>
|
||||
|
||||
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
|
||||
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
|
||||
if (result is Result.Failure) {
|
||||
throw NotaryInternalException(result.error)
|
||||
}
|
||||
}
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
object Success : Result()
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import net.corda.core.utilities.unwrap
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>() {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputsAndReferences = 10_000
|
||||
@ -34,7 +34,14 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
|
||||
service.commitInputStates(
|
||||
parts.inputs,
|
||||
txId,
|
||||
otherSideSession.counterparty,
|
||||
requestPayload.requestSignature,
|
||||
parts.timestamp,
|
||||
parts.references
|
||||
)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
@ -75,7 +82,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
|
||||
@Suspendable
|
||||
private fun signTransactionAndSendResponse(txId: SecureHash) {
|
||||
val signature = service.sign(txId)
|
||||
val signature = service.signTransaction(txId)
|
||||
otherSideSession.send(NotarisationResponse(listOf(signature)))
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,86 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
|
||||
/** Base implementation for a notary service operated by a singe party. */
|
||||
abstract class SinglePartyNotaryService : NotaryService() {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
protected open val log: Logger get() = staticLog
|
||||
|
||||
/** Handles input state uniqueness checks. */
|
||||
protected abstract val uniquenessProvider: UniquenessProvider
|
||||
|
||||
/** Attempts to commit the specified transaction [txId]. */
|
||||
@Suspendable
|
||||
open fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
|
||||
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
|
||||
// `UniquenessProvider`.
|
||||
|
||||
val callingFlow = FlowLogic.currentTopLevel
|
||||
?: throw IllegalStateException("This method should be invoked in a flow context.")
|
||||
val result = callingFlow.executeAsync(
|
||||
CommitOperation(
|
||||
this,
|
||||
inputs,
|
||||
txId,
|
||||
caller,
|
||||
requestSignature,
|
||||
timeWindow,
|
||||
references
|
||||
)
|
||||
)
|
||||
if (result is UniquenessProvider.Result.Failure) {
|
||||
throw NotaryInternalException(result.error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Required for the flow to be able to suspend until the commit is complete.
|
||||
* This object will be included in the flow checkpoint.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class CommitOperation(
|
||||
val service: SinglePartyNotaryService,
|
||||
val inputs: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val caller: Party,
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>
|
||||
) : FlowAsyncOperation<Result> {
|
||||
|
||||
override fun execute(deduplicationId: String): CordaFuture<Result> {
|
||||
return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
}
|
||||
}
|
||||
|
||||
/** Sign a single transaction. */
|
||||
fun signTransaction(txId: SecureHash): TransactionSignature {
|
||||
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
||||
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||
}
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
|
||||
/**
|
||||
* A base notary service implementation that provides functionality for cases where a signature by a single member
|
||||
* of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary.
|
||||
*/
|
||||
abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
protected open val log: Logger get() = staticLog
|
||||
protected abstract val uniquenessProvider: UniquenessProvider
|
||||
|
||||
/**
|
||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
@JvmOverloads
|
||||
fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef> = emptyList()
|
||||
) {
|
||||
try {
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
} catch (e: NotaryInternalException) {
|
||||
if (e.error is NotaryError.Conflict) {
|
||||
val allInputs = inputs + references
|
||||
val conflicts = allInputs.filterIndexed { _, stateRef ->
|
||||
val cause = e.error.consumedStates[stateRef]
|
||||
cause != null && cause.hashOfTransactionId != txId.sha256()
|
||||
}
|
||||
if (conflicts.isNotEmpty()) {
|
||||
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
|
||||
log.info("Notary conflicts for $txId: $conflicts")
|
||||
throw e
|
||||
}
|
||||
} else throw e
|
||||
} catch (e: Exception) {
|
||||
log.error("Internal error", e)
|
||||
throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later")))
|
||||
}
|
||||
}
|
||||
|
||||
/** Sign a [ByteArray] input. */
|
||||
fun sign(bits: ByteArray): DigitalSignature.WithKey {
|
||||
return services.keyManagementService.sign(bits, notaryIdentityKey)
|
||||
}
|
||||
|
||||
/** Sign a single transaction. */
|
||||
fun sign(txId: SecureHash): TransactionSignature {
|
||||
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
||||
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||
}
|
||||
|
||||
// TODO: Sign multiple transactions at once by building their Merkle tree and then signing over its root.
|
||||
}
|
@ -1,16 +1,16 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
* if any of the inputs have already been used in another transaction.
|
||||
*
|
||||
* A uniqueness provider is expected to be used from within the context of a flow.
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
@ -21,5 +21,13 @@ interface UniquenessProvider {
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow? = null,
|
||||
references: List<StateRef> = emptyList()
|
||||
)
|
||||
): CordaFuture<Result>
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
object Success : Result()
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
package net.corda.notary.raft
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
@ -13,7 +13,7 @@ import java.security.PublicKey
|
||||
class RaftNotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey
|
||||
) : TrustedAuthorityNotaryService() {
|
||||
) : SinglePartyNotaryService() {
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present")
|
||||
|
||||
|
@ -13,13 +13,14 @@ import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.cluster.Member
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -193,7 +194,7 @@ class RaftUniquenessProvider(
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
log.debug { "Attempting to commit input states: ${states.joinToString()}" }
|
||||
val commitCommand = CommitTransaction(
|
||||
states,
|
||||
@ -203,10 +204,16 @@ class RaftUniquenessProvider(
|
||||
timeWindow,
|
||||
references
|
||||
)
|
||||
val commitError = client.submit(commitCommand).get()
|
||||
if (commitError != null) throw NotaryInternalException(commitError)
|
||||
log.debug { "All input states of transaction $txId have been committed" }
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
client.submit(commitCommand).thenAccept { commitError ->
|
||||
val result = if (commitError != null) {
|
||||
UniquenessProvider.Result.Failure(commitError)
|
||||
} else {
|
||||
log.debug { "All input states of transaction $txId have been committed" }
|
||||
UniquenessProvider.Result.Success
|
||||
}
|
||||
future.set(result)
|
||||
}
|
||||
return future
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,7 +10,7 @@ import net.corda.core.internal.*
|
||||
import net.corda.core.internal.cordapp.CordappImpl
|
||||
import net.corda.core.internal.cordapp.CordappInfoResolver
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
@ -152,7 +152,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
|
||||
// the scanner won't find subclasses deeper down the hierarchy if any intermediate class is not
|
||||
// present in the CorDapp.
|
||||
val result = scanResult.getClassesWithSuperclass(NotaryService::class) +
|
||||
scanResult.getClassesWithSuperclass(TrustedAuthorityNotaryService::class)
|
||||
scanResult.getClassesWithSuperclass(SinglePartyNotaryService::class)
|
||||
logger.info("Found notary service CorDapp implementations: " + result.joinToString(", "))
|
||||
return result.firstOrNull()
|
||||
}
|
||||
|
@ -5,14 +5,14 @@ import net.corda.core.contracts.ComponentGroupEnum
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
/**
|
||||
* The received transaction is not checked for contract-validity, as that would require fully
|
||||
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction
|
||||
|
@ -12,7 +12,7 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
@ -36,7 +36,7 @@ import kotlin.concurrent.thread
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
@MappedSuperclass
|
||||
class BaseComittedState(
|
||||
@ -77,7 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>,
|
||||
val future: OpenFuture<AsyncUniquenessProvider.Result>)
|
||||
val future: OpenFuture<UniquenessProvider.Result>)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
@ -133,15 +133,15 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
*
|
||||
* Returns a future that will complete once the request is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commitAsync(
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<AsyncUniquenessProvider.Result> {
|
||||
val future = openFuture<AsyncUniquenessProvider.Result>()
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future)
|
||||
requestQueue.put(request)
|
||||
return future
|
||||
@ -232,13 +232,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
}
|
||||
|
||||
private fun respondWithSuccess(request: CommitRequest) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Success)
|
||||
request.future.set(UniquenessProvider.Result.Success)
|
||||
}
|
||||
}
|
||||
|
@ -1,14 +1,14 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import java.security.PublicKey
|
||||
|
||||
/** An embedded notary service that uses the node's database to store committed states. */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
|
@ -8,9 +8,9 @@ import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
@ -22,7 +22,7 @@ import java.security.SignatureException
|
||||
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
|
||||
* indeed valid.
|
||||
*/
|
||||
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
/**
|
||||
* Fully resolves the received transaction and its dependencies, runs contract verification logic and checks that
|
||||
* the transaction in question has all required signatures apart from the notary's.
|
||||
|
@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -12,8 +15,9 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -176,8 +180,15 @@ class TimedFlowTests {
|
||||
}.bufferUntilSubscribed().toBlocking().toFuture()
|
||||
}
|
||||
|
||||
private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = mock<UniquenessProvider>()
|
||||
private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
override val uniquenessProvider = object : UniquenessProvider {
|
||||
/** A dummy commit method that immediately returns a success message. */
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<UniquenessProvider.Result> {
|
||||
return openFuture<UniquenessProvider.Result>(). apply {
|
||||
set(UniquenessProvider.Result.Success)
|
||||
}
|
||||
}
|
||||
}
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
|
||||
override fun start() {}
|
||||
override fun stop() {}
|
||||
|
@ -8,14 +8,15 @@ import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.generateStateRef
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
@ -51,27 +52,26 @@ class PersistentUniquenessProviderTests {
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature).get()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature).get()
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
}
|
||||
val response:UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get()
|
||||
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
@ -23,7 +23,7 @@ import java.security.SignatureException
|
||||
* The notary-related APIs might change in the future.
|
||||
*/
|
||||
// START 1
|
||||
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||
|
@ -31,7 +31,6 @@ import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.internal.NodeFlowManager
|
||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
@ -45,6 +44,7 @@ import net.corda.node.services.keys.KeyManagementServiceInternal
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.DefaultNamedCacheFactory
|
||||
@ -436,7 +436,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
|
||||
|
||||
|
||||
fun createUnstartedNode(parameters: InternalMockNodeParameters = InternalMockNodeParameters()): MockNode {
|
||||
return createUnstartedNode(parameters, defaultFactory)
|
||||
}
|
||||
@ -504,11 +503,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
"MockNetwork.runNetwork() should only be used when networkSendManuallyPumped == false. " +
|
||||
"You can use MockNetwork.waitQuiescent() to wait for all the nodes to process all the messages on their queues instead."
|
||||
}
|
||||
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
|
||||
if (rounds == -1) {
|
||||
while (pumpAll().any { it != null }) {
|
||||
}
|
||||
do {
|
||||
awaitAsyncOperations()
|
||||
} while (pumpAll())
|
||||
} else {
|
||||
repeat(rounds) {
|
||||
pumpAll()
|
||||
@ -516,6 +515,32 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
}
|
||||
|
||||
private fun pumpAll(): Boolean {
|
||||
val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
return transferredMessages.any { it != null }
|
||||
}
|
||||
|
||||
/**
|
||||
* We wait for any flows that are suspended on an async operation completion to resume and either
|
||||
* finish the flow, or generate a response message.
|
||||
*/
|
||||
private fun awaitAsyncOperations() {
|
||||
while (anyFlowsSuspendedOnAsyncOperation()) {
|
||||
Thread.sleep(50)
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if there are any flows suspended waiting for an async operation to complete. */
|
||||
private fun anyFlowsSuspendedOnAsyncOperation(): Boolean {
|
||||
val allNodes = this._nodes
|
||||
val allActiveFlows = allNodes.flatMap { it.smm.snapshot() }
|
||||
|
||||
return allActiveFlows.any {
|
||||
val flowState = it.snapshot().checkpoint.flowState
|
||||
flowState is FlowState.Started && flowState.flowIORequest is FlowIORequest.ExecuteAsyncOperation
|
||||
}
|
||||
}
|
||||
|
||||
@JvmOverloads
|
||||
fun createPartyNode(legalName: CordaX500Name? = null): TestStartedNode {
|
||||
return createNode(InternalMockNodeParameters(legalName = legalName))
|
||||
|
Loading…
Reference in New Issue
Block a user