mirror of
https://github.com/corda/corda.git
synced 2025-04-16 07:27:17 +00:00
Merge pull request #1532 from corda/andrius/merge-11-02
Andrius/merge 11 02
This commit is contained in:
commit
c0c997f6ce
@ -1,77 +0,0 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
/** Base notary implementation for a notary that supports asynchronous calls from a flow. */
|
||||
abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider: UniquenessProvider get() = asyncUniquenessProvider
|
||||
/** A uniqueness provider that supports asynchronous commits. */
|
||||
protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider
|
||||
|
||||
/**
|
||||
* @throws NotaryInternalException if any of the states have been consumed by a different transaction.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
|
||||
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
|
||||
// `UniquenessProvider`.
|
||||
val result = FlowLogic.currentTopLevel!!.executeAsync(AsyncCFTNotaryService.CommitOperation(this, inputs, txId, caller, requestSignature, timeWindow, references))
|
||||
if (result is Result.Failure) throw NotaryInternalException(result.error)
|
||||
}
|
||||
|
||||
/**
|
||||
* Commits the provided input states asynchronously.
|
||||
*
|
||||
* If a consumed state conflict is reported by the [asyncUniquenessProvider], but it is caused by the same
|
||||
* transaction – the transaction is getting notarised twice – a success response will be returned.
|
||||
*/
|
||||
private fun commitAsync(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<Result> {
|
||||
return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references)
|
||||
}
|
||||
|
||||
/**
|
||||
* Required for the flow to be able to suspend until the commit is complete.
|
||||
* This object will be included in the flow checkpoint.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class CommitOperation(
|
||||
val service: AsyncCFTNotaryService,
|
||||
val inputs: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val caller: Party,
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>
|
||||
): FlowAsyncOperation<Result> {
|
||||
override fun execute(deduplicationId: String): CordaFuture<Result> {
|
||||
return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,35 +0,0 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
* if any of the inputs have already been used in another transaction.
|
||||
*/
|
||||
interface AsyncUniquenessProvider : UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<Result>
|
||||
|
||||
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
|
||||
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references).get()
|
||||
if (result is Result.Failure) {
|
||||
throw NotaryInternalException(result.error)
|
||||
}
|
||||
}
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
object Success : Result()
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
||||
|
@ -7,8 +7,6 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
/**
|
||||
@ -20,7 +18,7 @@ import net.corda.core.utilities.unwrap
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>(), IdempotentFlow {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputsAndReferences = 10_000
|
||||
@ -37,7 +35,14 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp, parts.references)
|
||||
service.commitInputStates(
|
||||
parts.inputs,
|
||||
txId,
|
||||
otherSideSession.counterparty,
|
||||
requestPayload.requestSignature,
|
||||
parts.timestamp,
|
||||
parts.references
|
||||
)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
@ -78,7 +83,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
||||
|
||||
@Suspendable
|
||||
private fun signTransactionAndSendResponse(txId: SecureHash) {
|
||||
val signature = service.sign(txId)
|
||||
val signature = service.signTransaction(txId)
|
||||
otherSideSession.send(NotarisationResponse(listOf(signature)))
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,86 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
|
||||
/** Base implementation for a notary service operated by a singe party. */
|
||||
abstract class SinglePartyNotaryService : NotaryService() {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
protected open val log: Logger get() = staticLog
|
||||
|
||||
/** Handles input state uniqueness checks. */
|
||||
protected abstract val uniquenessProvider: UniquenessProvider
|
||||
|
||||
/** Attempts to commit the specified transaction [txId]. */
|
||||
@Suspendable
|
||||
open fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
|
||||
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
|
||||
// `UniquenessProvider`.
|
||||
|
||||
val callingFlow = FlowLogic.currentTopLevel
|
||||
?: throw IllegalStateException("This method should be invoked in a flow context.")
|
||||
val result = callingFlow.executeAsync(
|
||||
CommitOperation(
|
||||
this,
|
||||
inputs,
|
||||
txId,
|
||||
caller,
|
||||
requestSignature,
|
||||
timeWindow,
|
||||
references
|
||||
)
|
||||
)
|
||||
if (result is UniquenessProvider.Result.Failure) {
|
||||
throw NotaryInternalException(result.error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Required for the flow to be able to suspend until the commit is complete.
|
||||
* This object will be included in the flow checkpoint.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class CommitOperation(
|
||||
val service: SinglePartyNotaryService,
|
||||
val inputs: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val caller: Party,
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>
|
||||
) : FlowAsyncOperation<Result> {
|
||||
|
||||
override fun execute(deduplicationId: String): CordaFuture<Result> {
|
||||
return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
}
|
||||
}
|
||||
|
||||
/** Sign a single transaction. */
|
||||
fun signTransaction(txId: SecureHash): TransactionSignature {
|
||||
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
||||
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||
}
|
||||
}
|
@ -1,72 +0,0 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
|
||||
/**
|
||||
* A base notary service implementation that provides functionality for cases where a signature by a single member
|
||||
* of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary.
|
||||
*/
|
||||
abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
protected open val log: Logger get() = staticLog
|
||||
protected abstract val uniquenessProvider: UniquenessProvider
|
||||
|
||||
/**
|
||||
* @throws NotaryException if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
@JvmOverloads
|
||||
@Suspendable
|
||||
open fun commitInputStates(
|
||||
inputs: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
caller: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef> = emptyList()
|
||||
) {
|
||||
try {
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
|
||||
} catch (e: NotaryInternalException) {
|
||||
if (e.error is NotaryError.Conflict) {
|
||||
val allInputs = inputs + references
|
||||
val conflicts = allInputs.filterIndexed { _, stateRef ->
|
||||
val cause = e.error.consumedStates[stateRef]
|
||||
cause != null && cause.hashOfTransactionId != txId.sha256()
|
||||
}
|
||||
if (conflicts.isNotEmpty()) {
|
||||
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
|
||||
log.info("Notary conflicts for $txId: $conflicts")
|
||||
throw e
|
||||
}
|
||||
} else throw e
|
||||
} catch (e: Exception) {
|
||||
log.error("Internal error", e)
|
||||
throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later")))
|
||||
}
|
||||
}
|
||||
|
||||
/** Sign a [ByteArray] input. */
|
||||
fun sign(bits: ByteArray): DigitalSignature.WithKey {
|
||||
return services.keyManagementService.sign(bits, notaryIdentityKey)
|
||||
}
|
||||
|
||||
/** Sign a single transaction. */
|
||||
fun sign(txId: SecureHash): TransactionSignature {
|
||||
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
||||
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||
}
|
||||
|
||||
// TODO: Sign multiple transactions at once by building their Merkle tree and then signing over its root.
|
||||
}
|
@ -1,16 +1,16 @@
|
||||
package net.corda.core.internal.notary
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.Party
|
||||
|
||||
/**
|
||||
* A service that records input states of the given transaction and provides conflict information
|
||||
* if any of the inputs have already been used in another transaction.
|
||||
*
|
||||
* A uniqueness provider is expected to be used from within the context of a flow.
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
@ -21,5 +21,13 @@ interface UniquenessProvider {
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow? = null,
|
||||
references: List<StateRef> = emptyList()
|
||||
)
|
||||
): CordaFuture<Result>
|
||||
|
||||
/** The outcome of committing a transaction. */
|
||||
sealed class Result {
|
||||
/** Indicates that all input states have been committed successfully. */
|
||||
object Success : Result()
|
||||
/** Indicates that the transaction has not been committed. */
|
||||
data class Failure(val error: NotaryError) : Result()
|
||||
}
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
package net.corda.notary.raft
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
@ -13,7 +13,7 @@ import java.security.PublicKey
|
||||
class RaftNotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey
|
||||
) : TrustedAuthorityNotaryService() {
|
||||
) : SinglePartyNotaryService() {
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${RaftNotaryService::class.java}: notary configuration not present")
|
||||
|
||||
|
@ -13,13 +13,14 @@ import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.cluster.Member
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -193,7 +194,7 @@ class RaftUniquenessProvider(
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
) {
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
log.debug { "Attempting to commit input states: ${states.joinToString()}" }
|
||||
val commitCommand = CommitTransaction(
|
||||
states,
|
||||
@ -203,10 +204,16 @@ class RaftUniquenessProvider(
|
||||
timeWindow,
|
||||
references
|
||||
)
|
||||
val commitError = client.submit(commitCommand).get()
|
||||
if (commitError != null) throw NotaryInternalException(commitError)
|
||||
log.debug { "All input states of transaction $txId have been committed" }
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
client.submit(commitCommand).thenAccept { commitError ->
|
||||
val result = if (commitError != null) {
|
||||
UniquenessProvider.Result.Failure(commitError)
|
||||
} else {
|
||||
log.debug { "All input states of transaction $txId have been committed" }
|
||||
UniquenessProvider.Result.Success
|
||||
}
|
||||
future.set(result)
|
||||
}
|
||||
return future
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,9 +10,8 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.cordapp.CordappImpl
|
||||
import net.corda.core.internal.cordapp.CordappInfoResolver
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
@ -154,8 +153,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
|
||||
// the scanner won't find subclasses deeper down the hierarchy if any intermediate class is not
|
||||
// present in the CorDapp.
|
||||
val result = scanResult.getClassesWithSuperclass(NotaryService::class) +
|
||||
scanResult.getClassesWithSuperclass(TrustedAuthorityNotaryService::class) +
|
||||
scanResult.getClassesWithSuperclass(AsyncCFTNotaryService::class)
|
||||
scanResult.getClassesWithSuperclass(SinglePartyNotaryService::class)
|
||||
logger.info("Found notary service CorDapp implementations: " + result.joinToString(", "))
|
||||
return result.firstOrNull()
|
||||
}
|
||||
|
@ -5,14 +5,14 @@ import net.corda.core.contracts.ComponentGroupEnum
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
/**
|
||||
* The received transaction is not checked for contract-validity, as that would require fully
|
||||
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction
|
||||
|
@ -12,7 +12,7 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
@ -36,7 +36,7 @@ import kotlin.concurrent.thread
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
@MappedSuperclass
|
||||
class BaseComittedState(
|
||||
@ -77,7 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>,
|
||||
val future: OpenFuture<AsyncUniquenessProvider.Result>)
|
||||
val future: OpenFuture<UniquenessProvider.Result>)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
@ -133,15 +133,15 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
*
|
||||
* Returns a future that will complete once the request is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commitAsync(
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<AsyncUniquenessProvider.Result> {
|
||||
val future = openFuture<AsyncUniquenessProvider.Result>()
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future)
|
||||
requestQueue.put(request)
|
||||
return future
|
||||
@ -232,13 +232,13 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
}
|
||||
|
||||
private fun respondWithSuccess(request: CommitRequest) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Success)
|
||||
request.future.set(UniquenessProvider.Result.Success)
|
||||
}
|
||||
}
|
||||
|
@ -1,14 +1,14 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import java.security.PublicKey
|
||||
|
||||
/** An embedded notary service that uses the node's database to store committed states. */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
|
@ -8,9 +8,9 @@ import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
@ -22,7 +22,7 @@ import java.security.SignatureException
|
||||
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
|
||||
* indeed valid.
|
||||
*/
|
||||
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
|
||||
/**
|
||||
* Fully resolves the received transaction and its dependencies, runs contract verification logic and checks that
|
||||
* the transaction in question has all required signatures apart from the notary's.
|
||||
|
@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -12,8 +15,9 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -52,19 +56,19 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||
lateinit var notary: Party
|
||||
lateinit var node: TestStartedNode
|
||||
|
||||
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
|
||||
val replicaIds = (0 until clusterSize)
|
||||
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
|
||||
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
|
||||
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
|
||||
serviceLegalName)
|
||||
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
|
||||
val replicaIds = (0 until clusterSize)
|
||||
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
|
||||
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
|
||||
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
|
||||
serviceLegalName)
|
||||
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
|
||||
val notaryConfig = mock<NotaryConfig> {
|
||||
whenever(it.serviceLegalName).thenReturn(serviceLegalName)
|
||||
whenever(it.validating).thenReturn(true)
|
||||
whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name)
|
||||
}
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
|
||||
val notaryConfig = mock<NotaryConfig> {
|
||||
whenever(it.serviceLegalName).thenReturn(serviceLegalName)
|
||||
whenever(it.validating).thenReturn(true)
|
||||
whenever(it.className).thenReturn(TimedFlowTests.TestNotaryService::class.java.name)
|
||||
}
|
||||
|
||||
val notaryNodes = (0 until clusterSize).map {
|
||||
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
|
||||
@ -128,7 +132,8 @@ class TimedFlowTests {
|
||||
/** The notary nodes don't run any consensus protocol, so 2 nodes are sufficient for the purpose of this test. */
|
||||
private val globalRule = TimedFlowTestRule(2)
|
||||
|
||||
@ClassRule @JvmField
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val ruleChain = RuleChain.outerRule(globalDatabaseRule).around(globalRule)
|
||||
|
||||
}
|
||||
@ -190,8 +195,16 @@ class TimedFlowTests {
|
||||
}.bufferUntilSubscribed().toBlocking().toFuture()
|
||||
}
|
||||
|
||||
class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = mock<UniquenessProvider>()
|
||||
class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
override val uniquenessProvider = object : UniquenessProvider {
|
||||
/** A dummy commit method that immediately returns a success message. */
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<UniquenessProvider.Result> {
|
||||
return openFuture<UniquenessProvider.Result>().apply {
|
||||
set(UniquenessProvider.Result.Success)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
|
||||
override fun start() {}
|
||||
override fun stop() {}
|
||||
|
@ -7,7 +7,7 @@ import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -24,7 +24,6 @@ import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class PersistentUniquenessProviderTests {
|
||||
@Rule
|
||||
@ -51,9 +50,9 @@ class PersistentUniquenessProviderTests {
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature).get()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -63,15 +62,12 @@ class PersistentUniquenessProviderTests {
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature).get()
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val response: UniquenessProvider.Result = provider.commit(inputs, secondTxId, identity, requestSignature).get()
|
||||
val error = (response as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
|
@ -1,8 +1,8 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
@ -12,12 +12,12 @@ import java.security.PublicKey
|
||||
/** Notary service backed by a replicated MySQL database. */
|
||||
class JPANotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
|
||||
override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
override val asyncUniquenessProvider = with(services) {
|
||||
override val uniquenessProvider = with(services) {
|
||||
val jpaNotaryConfig = try {
|
||||
notaryConfig.extraConfig!!.parseAs<JPANotaryConfiguration>()
|
||||
} catch (e: Exception) {
|
||||
@ -36,6 +36,6 @@ class JPANotaryService(
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
asyncUniquenessProvider.stop()
|
||||
uniquenessProvider.stop()
|
||||
}
|
||||
}
|
||||
|
@ -12,8 +12,8 @@ import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -38,7 +38,7 @@ import kotlin.concurrent.thread
|
||||
|
||||
/** A JPA backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, val config: JPANotaryConfiguration) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
// TODO: test vs. MySQLUniquenessProvider
|
||||
|
||||
@ -75,7 +75,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>,
|
||||
val future: OpenFuture<AsyncUniquenessProvider.Result>,
|
||||
val future: OpenFuture<UniquenessProvider.Result>,
|
||||
val requestEntity: Request,
|
||||
val committedStatesEntities: List<CommittedState>)
|
||||
|
||||
@ -132,15 +132,15 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
*
|
||||
* Returns a future that will complete once the requestEntitiy is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commitAsync(
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<AsyncUniquenessProvider.Result> {
|
||||
val future = openFuture<AsyncUniquenessProvider.Result>()
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
val requestEntities = Request(consumingTxHash = txId.toString(),
|
||||
partyName = callerIdentity.name.toString(),
|
||||
requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes,
|
||||
@ -215,7 +215,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
return findAlreadyCommitted(session, allInputs, references).toMutableMap()
|
||||
}
|
||||
|
||||
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): AsyncUniquenessProvider.Result {
|
||||
private fun processRequest(request: CommitRequest, allConflicts: MutableMap<StateRef, StateConsumptionDetails>, toCommit: MutableList<CommitRequest>): UniquenessProvider.Result {
|
||||
|
||||
val conflicts = (request.states + request.references).mapNotNull {
|
||||
if (allConflicts.containsKey(it)) it to allConflicts[it]!!
|
||||
@ -223,9 +223,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
}.toMap()
|
||||
val result = if (conflicts.isNotEmpty()) {
|
||||
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
|
||||
AsyncUniquenessProvider.Result.Success
|
||||
UniquenessProvider.Result.Success
|
||||
} else {
|
||||
AsyncUniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
|
||||
UniquenessProvider.Result.Failure(NotaryError.Conflict(request.txId, conflicts))
|
||||
}
|
||||
} else {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
|
||||
@ -235,9 +235,9 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
request.states.forEach {
|
||||
allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
|
||||
}
|
||||
AsyncUniquenessProvider.Result.Success
|
||||
UniquenessProvider.Result.Success
|
||||
} else {
|
||||
AsyncUniquenessProvider.Result.Failure(outsideTimeWindowError)
|
||||
UniquenessProvider.Result.Failure(outsideTimeWindowError)
|
||||
}
|
||||
}
|
||||
return result
|
||||
@ -275,7 +275,7 @@ class JPAUniquenessProvider(val clock: Clock, val database: CordaPersistence, va
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
|
@ -7,10 +7,8 @@ import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.config.toConfig
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.notary.jpa.JPAUniquenessProvider.Companion.decodeStateRef
|
||||
@ -27,7 +25,6 @@ import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class JPAUniquenessProviderTests {
|
||||
@Rule
|
||||
@ -54,11 +51,12 @@ class JPAUniquenessProviderTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
fun `should successfully commit a transaction with unused inputs`() {
|
||||
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
val result = provider.commit(listOf(inputState), txID, identity, requestSignature).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -68,13 +66,12 @@ class JPAUniquenessProviderTests {
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
val firstTxId = txID
|
||||
provider.commit(inputs, firstTxId, identity, requestSignature)
|
||||
val firstResult = provider.commit(inputs, firstTxId, identity, requestSignature).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, firstResult)
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
val secondResult = provider.commit(inputs, secondTxId, identity, requestSignature).get()
|
||||
val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
|
||||
@ -91,14 +88,15 @@ class JPAUniquenessProviderTests {
|
||||
val nrStates = notaryConfig.maxInputStates + notaryConfig.maxInputStates / 2
|
||||
val stateRefs = (1..nrStates).map { generateStateRef() }
|
||||
println(stateRefs.size)
|
||||
|
||||
val firstTxId = SecureHash.randomSHA256()
|
||||
val provider = JPAUniquenessProvider(Clock.systemUTC(), database, notaryConfig)
|
||||
provider.commit(stateRefs, firstTxId, identity, requestSignature)
|
||||
val firstResult = provider.commit(stateRefs, firstTxId, identity, requestSignature).get()
|
||||
assertEquals(UniquenessProvider.Result.Success, firstResult)
|
||||
|
||||
val secondTxId = SecureHash.randomSHA256()
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(stateRefs, secondTxId, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
val secondResult = provider.commit(stateRefs, secondTxId, identity, requestSignature).get()
|
||||
val error = (secondResult as UniquenessProvider.Result.Failure).error as NotaryError.Conflict
|
||||
assertEquals(nrStates, error.consumedStates.size)
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
package net.corda.notary.mysql
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
@ -12,7 +12,7 @@ import java.security.PublicKey
|
||||
/** Notary service backed by a replicated MySQL database. */
|
||||
class MySQLNotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
|
||||
override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
|
||||
/** Database table will be automatically created in dev mode */
|
||||
private val devMode = services.configuration.devMode
|
||||
@ -20,7 +20,7 @@ class MySQLNotaryService(
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
override val asyncUniquenessProvider = with(services) {
|
||||
override val uniquenessProvider = with(services) {
|
||||
val mysqlConfig = try {
|
||||
notaryConfig.extraConfig!!.parseAs<MySQLNotaryConfiguration>()
|
||||
} catch (e: Exception) {
|
||||
@ -41,10 +41,10 @@ class MySQLNotaryService(
|
||||
|
||||
|
||||
override fun start() {
|
||||
if (devMode) asyncUniquenessProvider.createTable()
|
||||
if (devMode) uniquenessProvider.createTable()
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
asyncUniquenessProvider.stop()
|
||||
uniquenessProvider.stop()
|
||||
}
|
||||
}
|
@ -19,9 +19,9 @@ import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
@ -49,7 +49,7 @@ class MySQLUniquenessProvider(
|
||||
metrics: MetricRegistry,
|
||||
val clock: Clock,
|
||||
val config: MySQLNotaryConfiguration
|
||||
) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<MySQLUniquenessProvider>()
|
||||
// TODO: optimize table schema for InnoDB
|
||||
@ -161,7 +161,7 @@ class MySQLUniquenessProvider(
|
||||
*
|
||||
* Returns a future that will complete once the request is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commitAsync(
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
|
@ -15,8 +15,8 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.UniquenessProvider.Result
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
@ -223,7 +223,7 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
if (requestSignature == null || random.nextInt(10) < 2) {
|
||||
requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
|
||||
}
|
||||
futures += AsyncCFTNotaryService.CommitOperation(
|
||||
futures += SinglePartyNotaryService.CommitOperation(
|
||||
service,
|
||||
inputs,
|
||||
txId,
|
||||
|
@ -7,7 +7,7 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
@ -23,7 +23,7 @@ import java.security.SignatureException
|
||||
* The notary-related APIs might change in the future.
|
||||
*/
|
||||
// START 1
|
||||
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||
|
@ -31,7 +31,6 @@ import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.internal.NodeFlowManager
|
||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
@ -42,6 +41,7 @@ import net.corda.node.services.keys.KeyManagementServiceInternal
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.EnterpriseNamedCacheFactory
|
||||
@ -433,7 +433,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
|
||||
|
||||
|
||||
fun createUnstartedNode(parameters: InternalMockNodeParameters = InternalMockNodeParameters()): MockNode {
|
||||
return createUnstartedNode(parameters, defaultFactory)
|
||||
}
|
||||
@ -502,11 +501,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
"MockNetwork.runNetwork() should only be used when networkSendManuallyPumped == false. " +
|
||||
"You can use MockNetwork.waitQuiescent() to wait for all the nodes to process all the messages on their queues instead."
|
||||
}
|
||||
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
|
||||
if (rounds == -1) {
|
||||
while (pumpAll().any { it != null }) {
|
||||
}
|
||||
do {
|
||||
awaitAsyncOperations()
|
||||
} while (pumpAll())
|
||||
} else {
|
||||
repeat(rounds) {
|
||||
pumpAll()
|
||||
@ -514,6 +513,32 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
}
|
||||
|
||||
private fun pumpAll(): Boolean {
|
||||
val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
return transferredMessages.any { it != null }
|
||||
}
|
||||
|
||||
/**
|
||||
* We wait for any flows that are suspended on an async operation completion to resume and either
|
||||
* finish the flow, or generate a response message.
|
||||
*/
|
||||
private fun awaitAsyncOperations() {
|
||||
while (anyFlowsSuspendedOnAsyncOperation()) {
|
||||
Thread.sleep(50)
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if there are any flows suspended waiting for an async operation to complete. */
|
||||
private fun anyFlowsSuspendedOnAsyncOperation(): Boolean {
|
||||
val allNodes = this._nodes
|
||||
val allActiveFlows = allNodes.flatMap { it.smm.snapshot() }
|
||||
|
||||
return allActiveFlows.any {
|
||||
val flowState = it.snapshot().checkpoint.flowState
|
||||
flowState is FlowState.Started && flowState.flowIORequest is FlowIORequest.ExecuteAsyncOperation
|
||||
}
|
||||
}
|
||||
|
||||
@JvmOverloads
|
||||
fun createPartyNode(legalName: CordaX500Name? = null): TestStartedNode {
|
||||
return createNode(InternalMockNodeParameters(legalName = legalName))
|
||||
|
@ -10,20 +10,19 @@ import net.corda.core.crypto.entropyToKeyPair
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import java.math.BigInteger
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@StartableByRPC
|
||||
open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
|
||||
open class AsyncLoadTestFlow<T : SinglePartyNotaryService>(
|
||||
private val serviceType: Class<T>,
|
||||
private val transactionCount: Int,
|
||||
private val batchSize: Int = 100,
|
||||
@ -57,7 +56,7 @@ open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
|
||||
|
||||
private fun runBatch(transactionCount: Int): Long {
|
||||
val stopwatch = Stopwatch.createStarted()
|
||||
val futures = mutableListOf<CordaFuture<AsyncUniquenessProvider.Result>>()
|
||||
val futures = mutableListOf<CordaFuture<UniquenessProvider.Result>>()
|
||||
|
||||
val service = serviceHub.cordaService(serviceType)
|
||||
|
||||
@ -72,7 +71,7 @@ open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
|
||||
val inputs = inputGenerator.generateOrFail(random)
|
||||
val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
|
||||
|
||||
futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature,
|
||||
futures += SinglePartyNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature,
|
||||
null, emptyList()).execute("")
|
||||
}
|
||||
|
||||
|
@ -1,67 +0,0 @@
|
||||
package net.corda.notarytest.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.base.Stopwatch
|
||||
import net.corda.client.mock.Generator
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.entropyToKeyPair
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import java.math.BigInteger
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@StartableByRPC
|
||||
open class LoadTestFlow<T : TrustedAuthorityNotaryService>(
|
||||
private val serviceType: Class<T>,
|
||||
private val transactionCount: Int,
|
||||
/**
|
||||
* Number of input states per transaction.
|
||||
* If *null*, variable sized transactions will be created with median size 4.
|
||||
*/
|
||||
private val inputStateCount: Int?
|
||||
) : FlowLogic<Long>() {
|
||||
private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) }
|
||||
private val publicKeyGenerator = keyPairGenerator.map { it.public }
|
||||
|
||||
private val publicKeyGenerator2 = Generator.pure(generateKeyPair().public)
|
||||
private val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGenerator2) { n, key ->
|
||||
Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key)
|
||||
}
|
||||
private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) }
|
||||
private val stateRefGenerator = Generator.intRange(0, 10).map { StateRef(SecureHash.randomSHA256(), it) }
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Long {
|
||||
val stopwatch = Stopwatch.createStarted()
|
||||
val random = SplittableRandom()
|
||||
|
||||
for (i in 1..transactionCount) {
|
||||
val txId: SecureHash = txIdGenerator.generateOrFail(random)
|
||||
val callerParty = partyGenerator.generateOrFail(random)
|
||||
val inputGenerator = if (inputStateCount == null) {
|
||||
Generator.replicatePoisson(4.0, stateRefGenerator, true)
|
||||
} else {
|
||||
Generator.replicate(inputStateCount, stateRefGenerator)
|
||||
}
|
||||
val inputs = inputGenerator.generateOrFail(random)
|
||||
val localStopwatch = Stopwatch.createStarted()
|
||||
val sig = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
|
||||
serviceHub.cordaService(serviceType).commitInputStates(inputs, txId, callerParty, sig, null)
|
||||
logger.info("Committed a transaction ${txId.toString().take(10)} with ${inputs.size} inputs in ${localStopwatch.stop().elapsed(TimeUnit.MILLISECONDS)} ms")
|
||||
}
|
||||
|
||||
stopwatch.stop()
|
||||
val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS)
|
||||
logger.info("Committed $transactionCount transactions in $duration, avg ${duration.toDouble() / transactionCount} ms")
|
||||
|
||||
return duration
|
||||
}
|
||||
}
|
@ -7,7 +7,7 @@ import com.codahale.metrics.graphite.PickledGraphite
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.internal.notary.AsyncCFTNotaryService
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -24,19 +24,19 @@ import java.security.PublicKey
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@CordaService
|
||||
class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
|
||||
class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
private val appConfig = ConfigHelper.loadConfig(Paths.get(".")).getConfig("custom")
|
||||
|
||||
override val asyncUniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider()
|
||||
override val uniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider()
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
override fun start() {
|
||||
asyncUniquenessProvider.createTable()
|
||||
uniquenessProvider.createTable()
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
asyncUniquenessProvider.stop()
|
||||
uniquenessProvider.stop()
|
||||
}
|
||||
|
||||
private fun createMetricsRegistry(): MetricRegistry {
|
||||
|
Loading…
x
Reference in New Issue
Block a user