Merge remote-tracking branch 'open/master' into andrius/merge-11-02

This commit is contained in:
Andrius Dagys 2018-11-02 09:08:03 +00:00
commit 539fac0d57
16 changed files with 207 additions and 177 deletions

View File

@ -1,35 +0,0 @@
package net.corda.core.internal.notary
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
/**
* A service that records input states of the given transaction and provides conflict information
* if any of the inputs have already been used in another transaction.
*/
interface AsyncUniquenessProvider : UniquenessProvider {
/** Commits all input states of the given transaction. */
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<Result>
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references).get()
if (result is Result.Failure) {
throw NotaryInternalException(result.error)
}
}
/** The outcome of committing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
object Success : Result()
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()
}
}

View File

@ -6,9 +6,6 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.utilities.unwrap
/**
@ -20,7 +17,7 @@ import net.corda.core.utilities.unwrap
* Additional transaction validation logic can be added when implementing [validateRequest].
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>() {
companion object {
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
private const val maxAllowedInputsAndReferences = 10_000
@ -37,7 +34,14 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
val parts = validateRequest(requestPayload)
txId = parts.id
checkNotary(parts.notary)
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
service.commitInputStates(
parts.inputs,
txId,
otherSideSession.counterparty,
requestPayload.requestSignature,
parts.timestamp,
parts.references
)
signTransactionAndSendResponse(txId)
} catch (e: NotaryInternalException) {
throw NotaryException(e.error, txId)
@ -78,7 +82,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
@Suspendable
private fun signTransactionAndSendResponse(txId: SecureHash) {
val signature = service.sign(txId)
val signature = service.signTransaction(txId)
otherSideSession.send(NotarisationResponse(listOf(signature)))
}

View File

@ -0,0 +1,86 @@
package net.corda.core.internal.notary
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.UniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
/** Base implementation for a notary service operated by a singe party. */
abstract class SinglePartyNotaryService : NotaryService() {
companion object {
private val staticLog = contextLogger()
}
protected open val log: Logger get() = staticLog
/** Handles input state uniqueness checks. */
protected abstract val uniquenessProvider: UniquenessProvider
/** Attempts to commit the specified transaction [txId]. */
@Suspendable
open fun commitInputStates(
inputs: List<StateRef>,
txId: SecureHash,
caller: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) {
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
// `UniquenessProvider`.
val callingFlow = FlowLogic.currentTopLevel
?: throw IllegalStateException("This method should be invoked in a flow context.")
val result = callingFlow.executeAsync(
CommitOperation(
this,
inputs,
txId,
caller,
requestSignature,
timeWindow,
references
)
)
if (result is UniquenessProvider.Result.Failure) {
throw NotaryInternalException(result.error)
}
}
/**
* Required for the flow to be able to suspend until the commit is complete.
* This object will be included in the flow checkpoint.
*/
@CordaSerializable
class CommitOperation(
val service: SinglePartyNotaryService,
val inputs: List<StateRef>,
val txId: SecureHash,
val caller: Party,
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>
) : FlowAsyncOperation<Result> {
override fun execute(deduplicationId: String): CordaFuture<Result> {
return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
}
}
/** Sign a single transaction. */
fun signTransaction(txId: SecureHash): TransactionSignature {
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
return services.keyManagementService.sign(signableData, notaryIdentityKey)
}
}

View File

@ -1,72 +0,0 @@
package net.corda.core.internal.notary
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
/**
* A base notary service implementation that provides functionality for cases where a signature by a single member
* of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary.
*/
abstract class TrustedAuthorityNotaryService : NotaryService() {
companion object {
private val staticLog = contextLogger()
}
protected open val log: Logger get() = staticLog
protected abstract val uniquenessProvider: UniquenessProvider
/**
* @throws NotaryException if any of the states have been consumed by a different transaction. Note that
* this method does not throw an exception when input states are present multiple times within the transaction.
*/
@JvmOverloads
@Suspendable
open fun commitInputStates(
inputs: List<StateRef>,
txId: SecureHash,
caller: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef> = emptyList()
) {
try {
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
} catch (e: NotaryInternalException) {
if (e.error is NotaryError.Conflict) {
val allInputs = inputs + references
val conflicts = allInputs.filterIndexed { _, stateRef ->
val cause = e.error.consumedStates[stateRef]
cause != null && cause.hashOfTransactionId != txId.sha256()
}
if (conflicts.isNotEmpty()) {
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
log.info("Notary conflicts for $txId: $conflicts")
throw e
}
} else throw e
} catch (e: Exception) {
log.error("Internal error", e)
throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later")))
}
}
/** Sign a [ByteArray] input. */
fun sign(bits: ByteArray): DigitalSignature.WithKey {
return services.keyManagementService.sign(bits, notaryIdentityKey)
}
/** Sign a single transaction. */
fun sign(txId: SecureHash): TransactionSignature {
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
return services.keyManagementService.sign(signableData, notaryIdentityKey)
}
// TODO: Sign multiple transactions at once by building their Merkle tree and then signing over its root.
}

View File

@ -1,16 +1,16 @@
package net.corda.core.internal.notary
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
/**
* A service that records input states of the given transaction and provides conflict information
* if any of the inputs have already been used in another transaction.
*
* A uniqueness provider is expected to be used from within the context of a flow.
*/
interface UniquenessProvider {
/** Commits all input states of the given transaction. */
@ -21,5 +21,13 @@ interface UniquenessProvider {
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow? = null,
references: List<StateRef> = emptyList()
)
): CordaFuture<Result>
/** The outcome of committing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
object Success : Result()
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()
}
}

View File

@ -1,8 +1,8 @@
package net.corda.notary.raft
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
@ -13,7 +13,7 @@ import java.security.PublicKey
class RaftNotaryService(
override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey
) : TrustedAuthorityNotaryService() {
) : SinglePartyNotaryService() {
private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present")

View File

@ -13,13 +13,14 @@ import io.atomix.copycat.server.CopycatServer
import io.atomix.copycat.server.cluster.Member
import io.atomix.copycat.server.storage.Storage
import io.atomix.copycat.server.storage.StorageLevel
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
@ -193,7 +194,7 @@ class RaftUniquenessProvider(
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) {
): CordaFuture<UniquenessProvider.Result> {
log.debug { "Attempting to commit input states: ${states.joinToString()}" }
val commitCommand = CommitTransaction(
states,
@ -203,10 +204,16 @@ class RaftUniquenessProvider(
timeWindow,
references
)
val commitError = client.submit(commitCommand).get()
if (commitError != null) throw NotaryInternalException(commitError)
log.debug { "All input states of transaction $txId have been committed" }
val future = openFuture<UniquenessProvider.Result>()
client.submit(commitCommand).thenAccept { commitError ->
val result = if (commitError != null) {
UniquenessProvider.Result.Failure(commitError)
} else {
log.debug { "All input states of transaction $txId have been committed" }
UniquenessProvider.Result.Success
}
future.set(result)
}
return future
}
}

View File

@ -10,9 +10,8 @@ import net.corda.core.flows.*
import net.corda.core.internal.*
import net.corda.core.internal.cordapp.CordappImpl
import net.corda.core.internal.cordapp.CordappInfoResolver
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.node.services.CordaService
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SerializationCustomSerializer
@ -154,8 +153,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
// the scanner won't find subclasses deeper down the hierarchy if any intermediate class is not
// present in the CorDapp.
val result = scanResult.getClassesWithSuperclass(NotaryService::class) +
scanResult.getClassesWithSuperclass(TrustedAuthorityNotaryService::class) +
scanResult.getClassesWithSuperclass(AsyncCFTNotaryService::class)
scanResult.getClassesWithSuperclass(SinglePartyNotaryService::class)
logger.info("Found notary service CorDapp implementations: " + result.joinToString(", "))
return result.firstOrNull()
}

View File

@ -5,14 +5,14 @@ import net.corda.core.contracts.ComponentGroupEnum
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotarisationPayload
import net.corda.core.flows.NotarisationRequest
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
/**
* The received transaction is not checked for contract-validity, as that would require fully
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction

View File

@ -12,7 +12,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
@ -36,7 +36,7 @@ import kotlin.concurrent.thread
/** A RDBMS backed Uniqueness provider */
@ThreadSafe
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() {
@MappedSuperclass
class BaseComittedState(
@ -77,7 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>,
val future: OpenFuture<AsyncUniquenessProvider.Result>)
val future: OpenFuture<UniquenessProvider.Result>)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
@ -133,15 +133,15 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
*
* Returns a future that will complete once the request is processed, containing the commit [Result].
*/
override fun commitAsync(
override fun commit(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
): CordaFuture<AsyncUniquenessProvider.Result> {
val future = openFuture<AsyncUniquenessProvider.Result>()
): CordaFuture<UniquenessProvider.Result> {
val future = openFuture<UniquenessProvider.Result>()
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future)
requestQueue.put(request)
return future
@ -232,13 +232,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
private fun respondWithError(request: CommitRequest, exception: Exception) {
if (exception is NotaryInternalException) {
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
request.future.set(UniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
}
private fun respondWithSuccess(request: CommitRequest) {
request.future.set(AsyncUniquenessProvider.Result.Success)
request.future.set(UniquenessProvider.Result.Success)
}
}

View File

@ -1,14 +1,14 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.schemas.MappedSchema
import net.corda.node.services.api.ServiceHubInternal
import java.security.PublicKey
/** An embedded notary service that uses the node's database to store committed states. */
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
private val notaryConfig = services.configuration.notary
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")

View File

@ -8,9 +8,9 @@ import net.corda.core.flows.NotarisationPayload
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotaryError
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionWithSignatures
import net.corda.core.transactions.WireTransaction
@ -22,7 +22,7 @@ import java.security.SignatureException
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
* indeed valid.
*/
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
/**
* Fully resolves the received transaction and its dependencies, runs contract verification logic and checks that
* the transaction in question has all required signatures apart from the notary's.

View File

@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
@ -12,8 +15,9 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.SignedTransaction
@ -52,19 +56,19 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
lateinit var notary: Party
lateinit var node: TestStartedNode
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
val replicaIds = (0 until clusterSize)
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
serviceLegalName)
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
val replicaIds = (0 until clusterSize)
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
serviceLegalName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
val notaryConfig = mock<NotaryConfig> {
whenever(it.serviceLegalName).thenReturn(serviceLegalName)
whenever(it.validating).thenReturn(true)
whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name)
}
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
val notaryConfig = mock<NotaryConfig> {
whenever(it.serviceLegalName).thenReturn(serviceLegalName)
whenever(it.validating).thenReturn(true)
whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name)
}
val notaryNodes = (0 until clusterSize).map {
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
@ -128,7 +132,8 @@ class TimedFlowTests {
/** The notary nodes don't run any consensus protocol, so 2 nodes are sufficient for the purpose of this test. */
private val globalRule = TimedFlowTestRule(2)
@ClassRule @JvmField
@ClassRule
@JvmField
val ruleChain = RuleChain.outerRule(globalDatabaseRule).around(globalRule)
}
@ -190,8 +195,16 @@ class TimedFlowTests {
}.bufferUntilSubscribed().toBlocking().toFuture()
}
class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val uniquenessProvider = mock<UniquenessProvider>()
class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
override val uniquenessProvider = object : UniquenessProvider {
/** A dummy commit method that immediately returns a success message. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<UniquenessProvider.Result> {
return openFuture<UniquenessProvider.Result>().apply {
set(UniquenessProvider.Result.Success)
}
}
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}

View File

@ -7,7 +7,7 @@ import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -24,7 +24,6 @@ import org.junit.Rule
import org.junit.Test
import java.time.Clock
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class PersistentUniquenessProviderTests {
@Rule
@ -51,9 +50,9 @@ class PersistentUniquenessProviderTests {
@Test
fun `should commit a transaction with unused inputs without exception`() {
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
val inputState = generateStateRef()
val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity, requestSignature)
provider.commit(listOf(inputState), txID, identity, requestSignature).get()
}
@Test
@ -63,15 +62,12 @@ class PersistentUniquenessProviderTests {
val inputs = listOf(inputState)
val firstTxId = txID
provider.commit(inputs, firstTxId, identity, requestSignature)
provider.commit(inputs, firstTxId, identity, requestSignature)
provider.commit(inputs, firstTxId, identity, requestSignature).get()
val secondTxId = SecureHash.randomSHA256()
val ex = assertFailsWith<NotaryInternalException> {
provider.commit(inputs, secondTxId, identity, requestSignature)
}
val error = ex.error as NotaryError.Conflict
val response: UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get()
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
val conflictCause = error.consumedStates[inputState]!!
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())

View File

@ -7,7 +7,7 @@ import net.corda.core.flows.*
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionWithSignatures
import net.corda.core.transactions.WireTransaction
@ -23,7 +23,7 @@ import java.security.SignatureException
* The notary-related APIs might change in the future.
*/
// START 1
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)

View File

@ -31,7 +31,6 @@ import net.corda.node.VersionInfo
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.NodeFlowManager
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices
@ -42,6 +41,7 @@ import net.corda.node.services.keys.KeyManagementServiceInternal
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.EnterpriseNamedCacheFactory
@ -433,7 +433,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
}
fun createUnstartedNode(parameters: InternalMockNodeParameters = InternalMockNodeParameters()): MockNode {
return createUnstartedNode(parameters, defaultFactory)
}
@ -502,11 +501,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
"MockNetwork.runNetwork() should only be used when networkSendManuallyPumped == false. " +
"You can use MockNetwork.waitQuiescent() to wait for all the nodes to process all the messages on their queues instead."
}
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
if (rounds == -1) {
while (pumpAll().any { it != null }) {
}
do {
awaitAsyncOperations()
} while (pumpAll())
} else {
repeat(rounds) {
pumpAll()
@ -514,6 +513,32 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
}
}
private fun pumpAll(): Boolean {
val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) }
return transferredMessages.any { it != null }
}
/**
* We wait for any flows that are suspended on an async operation completion to resume and either
* finish the flow, or generate a response message.
*/
private fun awaitAsyncOperations() {
while (anyFlowsSuspendedOnAsyncOperation()) {
Thread.sleep(50)
}
}
/** Returns true if there are any flows suspended waiting for an async operation to complete. */
private fun anyFlowsSuspendedOnAsyncOperation(): Boolean {
val allNodes = this._nodes
val allActiveFlows = allNodes.flatMap { it.smm.snapshot() }
return allActiveFlows.any {
val flowState = it.snapshot().checkpoint.flowState
flowState is FlowState.Started && flowState.flowIORequest is FlowIORequest.ExecuteAsyncOperation
}
}
@JvmOverloads
fun createPartyNode(legalName: CordaX500Name? = null): TestStartedNode {
return createNode(InternalMockNodeParameters(legalName = legalName))