mirror of
https://github.com/corda/corda.git
synced 2024-12-21 05:53:23 +00:00
EG-2375 - batching notary open sourcing. (#6507)
This commit is contained in:
parent
1e6be340eb
commit
52cbe04b8c
@ -25,7 +25,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.notary.jpa.JPAUniquenessProvider
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
@ -856,8 +856,8 @@ class VaultObserverExceptionTest {
|
||||
override fun call(): List<String> {
|
||||
return serviceHub.withEntityManager {
|
||||
val criteriaQuery = this.criteriaBuilder.createQuery(String::class.java)
|
||||
val root = criteriaQuery.from(PersistentUniquenessProvider.CommittedTransaction::class.java)
|
||||
criteriaQuery.select(root.get<String>(PersistentUniquenessProvider.CommittedTransaction::transactionId.name))
|
||||
val root = criteriaQuery.from(JPAUniquenessProvider.CommittedTransaction::class.java)
|
||||
criteriaQuery.select(root.get(JPAUniquenessProvider.CommittedTransaction::transactionId.name))
|
||||
val query = this.createQuery(criteriaQuery)
|
||||
query.resultList
|
||||
}
|
||||
|
@ -133,7 +133,6 @@ import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.BasicVerifierFactoryService
|
||||
import net.corda.node.services.transactions.DeterministicVerifierFactoryService
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.services.transactions.VerifierFactoryService
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
@ -792,10 +791,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
)
|
||||
}
|
||||
|
||||
private fun isRunningSimpleNotaryService(configuration: NodeConfiguration): Boolean {
|
||||
return configuration.notary != null && configuration.notary?.className == SimpleNotaryService::class.java.name
|
||||
}
|
||||
|
||||
private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause)
|
||||
|
||||
private fun installCordaServices() {
|
||||
|
@ -6,12 +6,12 @@ import net.corda.core.flows.ContractUpgradeFlow
|
||||
import net.corda.core.internal.cordapp.CordappImpl
|
||||
import net.corda.core.internal.location
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.transactions.NodeNotarySchemaV1
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.notary.experimental.bftsmart.BFTSmartNotarySchemaV1
|
||||
import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService
|
||||
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
|
||||
import net.corda.notary.experimental.raft.RaftNotaryService
|
||||
import net.corda.notary.jpa.JPANotarySchemaV1
|
||||
import net.corda.notary.jpa.JPANotaryService
|
||||
|
||||
internal object VirtualCordapp {
|
||||
/** A list of the core RPC flows present in Corda */
|
||||
@ -46,7 +46,7 @@ internal object VirtualCordapp {
|
||||
}
|
||||
|
||||
/** A Cordapp for the built-in notary service implementation. */
|
||||
fun generateSimpleNotary(versionInfo: VersionInfo): CordappImpl {
|
||||
fun generateJPANotary(versionInfo: VersionInfo): CordappImpl {
|
||||
return CordappImpl(
|
||||
contractClassNames = listOf(),
|
||||
initiatedFlows = listOf(),
|
||||
@ -57,15 +57,16 @@ internal object VirtualCordapp {
|
||||
serializationWhitelists = listOf(),
|
||||
serializationCustomSerializers = listOf(),
|
||||
checkpointCustomSerializers = listOf(),
|
||||
customSchemas = setOf(NodeNotarySchemaV1),
|
||||
customSchemas = setOf(JPANotarySchemaV1),
|
||||
info = Cordapp.Info.Default("corda-notary", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"),
|
||||
allFlows = listOf(),
|
||||
jarPath = SimpleNotaryService::class.java.location,
|
||||
jarPath = JPANotaryService::class.java.location,
|
||||
jarHash = SecureHash.allOnesHash,
|
||||
minimumPlatformVersion = versionInfo.platformVersion,
|
||||
targetPlatformVersion = versionInfo.platformVersion,
|
||||
notaryService = SimpleNotaryService::class.java,
|
||||
isLoaded = false
|
||||
notaryService = JPANotaryService::class.java,
|
||||
isLoaded = false,
|
||||
isVirtual = true
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,6 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
|
||||
// when mapped schemas from the finance module are present, they are considered as internal ones
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1" ||
|
||||
schema::class.qualifiedName?.startsWith("net.corda.notary.") ?: false
|
||||
}
|
||||
|
||||
|
@ -1,49 +0,0 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import java.security.PublicKey
|
||||
|
||||
/** An embedded notary service that uses the node's database to store committed states. */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
init {
|
||||
val mode = if (notaryConfig.validating) "validating" else "non-validating"
|
||||
log.info("Starting notary in $mode mode")
|
||||
}
|
||||
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(
|
||||
services.clock,
|
||||
services.database,
|
||||
services.cacheFactory,
|
||||
::signTransaction)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
|
||||
return if (notaryConfig.validating) {
|
||||
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
} else {
|
||||
NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {}
|
||||
override fun stop() {}
|
||||
}
|
||||
|
||||
// Entities used by a Notary
|
||||
object NodeNotarySchema
|
||||
|
||||
object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaClass, version = 1,
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
||||
PersistentUniquenessProvider.Request::class.java,
|
||||
PersistentUniquenessProvider.CommittedState::class.java,
|
||||
PersistentUniquenessProvider.CommittedTransaction::class.java
|
||||
)) {
|
||||
override val migrationResource = "node-notary.changelog-master"
|
||||
}
|
@ -9,10 +9,10 @@ import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.cordapp.VirtualCordapp
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.nodeapi.internal.cordapp.CordappLoader
|
||||
import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService
|
||||
import net.corda.notary.experimental.raft.RaftNotaryService
|
||||
import net.corda.notary.jpa.JPANotaryService
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.security.PublicKey
|
||||
|
||||
@ -44,8 +44,8 @@ class NotaryLoader(
|
||||
RaftNotaryService::class.java
|
||||
}
|
||||
else -> {
|
||||
builtInNotary = VirtualCordapp.generateSimpleNotary(versionInfo)
|
||||
SimpleNotaryService::class.java
|
||||
builtInNotary = VirtualCordapp.generateJPANotary(versionInfo)
|
||||
JPANotaryService::class.java
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
54
node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt
Normal file
54
node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt
Normal file
@ -0,0 +1,54 @@
|
||||
package net.corda.notary.common
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.MerkleTree
|
||||
import net.corda.core.crypto.PartialMerkleTree
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.node.ServiceHub
|
||||
import java.security.PublicKey
|
||||
|
||||
typealias BatchSigningFunction = (Iterable<SecureHash>) -> BatchSignature
|
||||
|
||||
/** Generates a signature over the bach of [txIds]. */
|
||||
fun signBatch(
|
||||
txIds: Iterable<SecureHash>,
|
||||
notaryIdentityKey: PublicKey,
|
||||
services: ServiceHub
|
||||
): BatchSignature {
|
||||
val merkleTree = MerkleTree.getMerkleTree(txIds.map { it.sha256() })
|
||||
val merkleTreeRoot = merkleTree.hash
|
||||
val signableData = SignableData(
|
||||
merkleTreeRoot,
|
||||
SignatureMetadata(
|
||||
services.myInfo.platformVersion,
|
||||
Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID
|
||||
)
|
||||
)
|
||||
val sig = services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||
return BatchSignature(sig, merkleTree)
|
||||
}
|
||||
|
||||
/** The outcome of just committing a transaction. */
|
||||
sealed class InternalResult {
|
||||
object Success : InternalResult()
|
||||
data class Failure(val error: NotaryError) : InternalResult()
|
||||
}
|
||||
|
||||
data class BatchSignature(
|
||||
val rootSignature: TransactionSignature,
|
||||
val fullMerkleTree: MerkleTree) {
|
||||
/** Extracts a signature with a partial Merkle tree for the specified leaf in the batch signature. */
|
||||
fun forParticipant(txId: SecureHash): TransactionSignature {
|
||||
return TransactionSignature(
|
||||
rootSignature.bytes,
|
||||
rootSignature.by,
|
||||
rootSignature.signatureMetadata,
|
||||
PartialMerkleTree.build(fullMerkleTree, listOf(txId.sha256()))
|
||||
)
|
||||
}
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
data class JPANotaryConfiguration(
|
||||
val batchSize: Int = 32,
|
||||
val batchTimeoutMs: Long = 200L,
|
||||
val maxInputStates: Int = 2000,
|
||||
val maxDBTransactionRetryCount: Int = 10,
|
||||
val backOffBaseMs: Long = 20L
|
||||
)
|
@ -0,0 +1,55 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.internal.notary.SinglePartyNotaryService
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.NonValidatingNotaryFlow
|
||||
import net.corda.node.services.transactions.ValidatingNotaryFlow
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.notary.common.signBatch
|
||||
import java.security.PublicKey
|
||||
|
||||
/** Notary service backed by a relational database. */
|
||||
class JPANotaryService(
|
||||
override val services: ServiceHubInternal,
|
||||
override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
|
||||
|
||||
private val notaryConfig = services.configuration.notary
|
||||
?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present")
|
||||
|
||||
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
override val uniquenessProvider = with(services) {
|
||||
val jpaNotaryConfig = try {
|
||||
notaryConfig.extraConfig?.parseAs() ?: JPANotaryConfiguration()
|
||||
} catch (e: Exception) {
|
||||
throw IllegalArgumentException("Failed to register ${JPANotaryService::class.java}: extra notary configuration parameters invalid")
|
||||
}
|
||||
JPAUniquenessProvider(
|
||||
clock,
|
||||
database,
|
||||
jpaNotaryConfig,
|
||||
configuration.myLegalName,
|
||||
::signTransactionBatch
|
||||
)
|
||||
}
|
||||
|
||||
private fun signTransactionBatch(txIds: Iterable<SecureHash>)
|
||||
= signBatch(txIds, notaryIdentityKey, services)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
|
||||
return if (notaryConfig.validating) {
|
||||
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
} else NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
uniquenessProvider.stop()
|
||||
}
|
||||
}
|
@ -0,0 +1,408 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import com.google.common.collect.Queues
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.notary.common.BatchSigningFunction
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.notary.common.InternalResult
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding
|
||||
import org.hibernate.Session
|
||||
import java.sql.SQLException
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.EmbeddedId
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.Lob
|
||||
import javax.persistence.NamedQuery
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/** A JPA backed Uniqueness provider */
|
||||
@Suppress("MagicNumber") // database column length
|
||||
@ThreadSafe
|
||||
class JPAUniquenessProvider(
|
||||
val clock: Clock,
|
||||
val database: CordaPersistence,
|
||||
val config: JPANotaryConfiguration = JPANotaryConfiguration(),
|
||||
val notaryWorkerName: CordaX500Name,
|
||||
val signBatch: BatchSigningFunction
|
||||
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
// This is the prefix of the ID in the request log table, to allow running multiple instances that access the
|
||||
// same table.
|
||||
private val instanceId = UUID.randomUUID()
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log")
|
||||
@CordaSerializable
|
||||
class Request(
|
||||
@Id
|
||||
@Column(nullable = true, length = 76)
|
||||
var id: String? = null,
|
||||
|
||||
@Column(name = "consuming_transaction_id", nullable = true, length = 64)
|
||||
val consumingTxHash: String?,
|
||||
|
||||
@Column(name = "requesting_party_name", nullable = true, length = 255)
|
||||
var partyName: String?,
|
||||
|
||||
@Lob
|
||||
@Column(name = "request_signature", nullable = false)
|
||||
val requestSignature: ByteArray,
|
||||
|
||||
@Column(name = "request_timestamp", nullable = false)
|
||||
var requestDate: Instant,
|
||||
|
||||
@Column(name = "worker_node_x500_name", nullable = true, length = 255)
|
||||
val workerNodeX500Name: String?
|
||||
)
|
||||
|
||||
private data class CommitRequest(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val callerIdentity: Party,
|
||||
val requestSignature: NotarisationRequestSignature,
|
||||
val timeWindow: TimeWindow?,
|
||||
val references: List<StateRef>,
|
||||
val future: OpenFuture<UniquenessProvider.Result>,
|
||||
val requestEntity: Request,
|
||||
val committedStatesEntities: List<CommittedState>)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
@NamedQuery(name = "CommittedState.select", query = "SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids")
|
||||
class CommittedState(
|
||||
@EmbeddedId
|
||||
val id: PersistentStateRef,
|
||||
@Column(name = "consuming_transaction_id", nullable = false, length = 64)
|
||||
val consumingTxHash: String)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_txs")
|
||||
class CommittedTransaction(
|
||||
@Id
|
||||
@Column(name = "transaction_id", nullable = false, length = 64)
|
||||
val transactionId: String
|
||||
)
|
||||
|
||||
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
|
||||
|
||||
/** A requestEntity processor thread. */
|
||||
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
|
||||
try {
|
||||
val buffer = LinkedList<CommitRequest>()
|
||||
while (!Thread.interrupted()) {
|
||||
val drainedSize = Queues.drain(requestQueue, buffer, config.batchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
if (drainedSize == 0) continue
|
||||
processRequests(buffer)
|
||||
buffer.clear()
|
||||
}
|
||||
} catch (_: InterruptedException) {
|
||||
log.debug { "Process interrupted."}
|
||||
}
|
||||
log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
processorThread.interrupt()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val requestQueueSize = 100_000
|
||||
private const val jdbcBatchSize = 100_000
|
||||
private val log = contextLogger()
|
||||
|
||||
fun encodeStateRef(s: StateRef): PersistentStateRef {
|
||||
return PersistentStateRef(s.txhash.toString(), s.index)
|
||||
}
|
||||
|
||||
fun decodeStateRef(s: PersistentStateRef): StateRef {
|
||||
return StateRef(txhash = SecureHash.parse(s.txId), index = s.index)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates and adds a [CommitRequest] to the requestEntity queue. If the requestEntity queue is full, this method will block
|
||||
* until space is available.
|
||||
*
|
||||
* Returns a future that will complete once the requestEntity is processed, containing the commit [Result].
|
||||
*/
|
||||
override fun commit(
|
||||
states: List<StateRef>,
|
||||
txId: SecureHash,
|
||||
callerIdentity: Party,
|
||||
requestSignature: NotarisationRequestSignature,
|
||||
timeWindow: TimeWindow?,
|
||||
references: List<StateRef>
|
||||
): CordaFuture<UniquenessProvider.Result> {
|
||||
val future = openFuture<UniquenessProvider.Result>()
|
||||
val requestEntities = Request(consumingTxHash = txId.toString(),
|
||||
partyName = callerIdentity.name.toString(),
|
||||
requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes,
|
||||
requestDate = clock.instant(),
|
||||
workerNodeX500Name = notaryWorkerName.toString())
|
||||
val stateEntities = states.map {
|
||||
CommittedState(
|
||||
encodeStateRef(it),
|
||||
txId.toString()
|
||||
)
|
||||
}
|
||||
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future, requestEntities, stateEntities)
|
||||
|
||||
requestQueue.put(request)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
// Safe up to 100k requests per second.
|
||||
private var nextRequestId = System.currentTimeMillis() * 100
|
||||
|
||||
private fun logRequests(requests: List<CommitRequest>) {
|
||||
database.transaction {
|
||||
for (request in requests) {
|
||||
request.requestEntity.id = "$instanceId:${(nextRequestId++).toString(16)}"
|
||||
session.persist(request.requestEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun commitRequests(session: Session, requests: List<CommitRequest>) {
|
||||
for (request in requests) {
|
||||
for (cs in request.committedStatesEntities) {
|
||||
session.persist(cs)
|
||||
}
|
||||
session.persist(CommittedTransaction(request.txId.toString()))
|
||||
}
|
||||
}
|
||||
|
||||
private fun findAlreadyCommitted(session: Session, states: List<StateRef>, references: List<StateRef>): Map<StateRef, StateConsumptionDetails> {
|
||||
val persistentStateRefs = (states + references).map { encodeStateRef(it) }.toSet()
|
||||
val committedStates = mutableListOf<CommittedState>()
|
||||
|
||||
for (idsBatch in persistentStateRefs.chunked(config.maxInputStates)) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val existing = session
|
||||
.createNamedQuery("CommittedState.select")
|
||||
.setParameter("ids", idsBatch)
|
||||
.resultList as List<CommittedState>
|
||||
committedStates.addAll(existing)
|
||||
}
|
||||
|
||||
return committedStates.map {
|
||||
val stateRef = StateRef(txhash = SecureHash.parse(it.id.txId), index = it.id.index)
|
||||
val consumingTxId = SecureHash.parse(it.consumingTxHash)
|
||||
if (stateRef in references) {
|
||||
stateRef to StateConsumptionDetails(consumingTxId.sha256(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||
} else {
|
||||
stateRef to StateConsumptionDetails(consumingTxId.sha256())
|
||||
}
|
||||
}.toMap()
|
||||
}
|
||||
|
||||
private fun<T> withRetry(block: () -> T): T {
|
||||
var retryCount = 0
|
||||
var backOff = config.backOffBaseMs
|
||||
var exceptionCaught: SQLException? = null
|
||||
while (retryCount <= config.maxDBTransactionRetryCount) {
|
||||
try {
|
||||
val res = block()
|
||||
return res
|
||||
} catch (e: SQLException) {
|
||||
retryCount++
|
||||
Thread.sleep(backOff)
|
||||
backOff *= 2
|
||||
exceptionCaught = e
|
||||
}
|
||||
}
|
||||
throw exceptionCaught!!
|
||||
}
|
||||
|
||||
private fun findAllConflicts(session: Session, requests: List<CommitRequest>): MutableMap<StateRef, StateConsumptionDetails> {
|
||||
log.info("Processing notarization requests with ${requests.sumBy { it.states.size }} input states and ${requests.sumBy { it.references.size }} references")
|
||||
|
||||
val allStates = requests.flatMap { it.states }
|
||||
val allReferences = requests.flatMap { it.references }
|
||||
return findAlreadyCommitted(session, allStates, allReferences).toMutableMap()
|
||||
}
|
||||
|
||||
private fun processRequest(
|
||||
session: Session,
|
||||
request: CommitRequest,
|
||||
consumedStates: MutableMap<StateRef, StateConsumptionDetails>,
|
||||
processedTxIds: MutableMap<SecureHash, InternalResult>,
|
||||
toCommit: MutableList<CommitRequest>
|
||||
): InternalResult {
|
||||
val conflicts = (request.states + request.references).mapNotNull {
|
||||
if (consumedStates.containsKey(it)) it to consumedStates[it]!!
|
||||
else null
|
||||
}.toMap()
|
||||
|
||||
return if (conflicts.isNotEmpty()) {
|
||||
handleStateConflicts(request, conflicts, session)
|
||||
} else {
|
||||
handleNoStateConflicts(request, toCommit, consumedStates, processedTxIds, session)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the [request] given there are conflicting states already present in the DB or current batch.
|
||||
*
|
||||
* To ensure idempotency, if the request's transaction matches a previously consumed transaction then the
|
||||
* same result (success) can be returned without committing it to the DB. Failure is only returned in the
|
||||
* case where the request is not a duplicate of a previously processed request and hence it is a genuine
|
||||
* double spend attempt.
|
||||
*/
|
||||
private fun handleStateConflicts(
|
||||
request: CommitRequest,
|
||||
stateConflicts: Map<StateRef, StateConsumptionDetails>,
|
||||
session: Session
|
||||
): InternalResult {
|
||||
return when {
|
||||
isConsumedByTheSameTx(request.txId.sha256(), stateConflicts) -> {
|
||||
InternalResult.Success
|
||||
}
|
||||
request.states.isEmpty() && isPreviouslyNotarised(session, request.txId) -> {
|
||||
InternalResult.Success
|
||||
}
|
||||
else -> {
|
||||
InternalResult.Failure(NotaryError.Conflict(request.txId, stateConflicts))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the [request] given there are no conflicting states already present in the DB or current batch.
|
||||
*
|
||||
* This method performs time window validation and adds the request to the commit list if applicable.
|
||||
* It also checks the [processedTxIds] map to ensure that any time-window only duplicates within the batch
|
||||
* are only committed once.
|
||||
*/
|
||||
private fun handleNoStateConflicts(
|
||||
request: CommitRequest,
|
||||
toCommit: MutableList<CommitRequest>,
|
||||
consumedStates: MutableMap<StateRef, StateConsumptionDetails>,
|
||||
processedTxIds: MutableMap<SecureHash, InternalResult>,
|
||||
session: Session
|
||||
): InternalResult {
|
||||
return when {
|
||||
request.states.isEmpty() && isPreviouslyNotarised(session, request.txId) -> {
|
||||
InternalResult.Success
|
||||
}
|
||||
processedTxIds.containsKey(request.txId) -> {
|
||||
processedTxIds[request.txId]!!
|
||||
}
|
||||
else -> {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
|
||||
val internalResult = if (outsideTimeWindowError != null) {
|
||||
InternalResult.Failure(outsideTimeWindowError)
|
||||
} else {
|
||||
// Mark states as consumed to capture conflicting transactions in the same batch
|
||||
request.states.forEach {
|
||||
consumedStates[it] = StateConsumptionDetails(request.txId.sha256())
|
||||
}
|
||||
toCommit.add(request)
|
||||
InternalResult.Success
|
||||
}
|
||||
// Store transaction result to capture conflicting time-window only transactions in the same batch
|
||||
processedTxIds[request.txId] = internalResult
|
||||
internalResult
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun isPreviouslyNotarised(session: Session, txId: SecureHash): Boolean {
|
||||
return session.find(CommittedTransaction::class.java, txId.toString()) != null
|
||||
}
|
||||
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
private fun processRequests(requests: List<CommitRequest>) {
|
||||
try {
|
||||
// Note that there is an additional retry mechanism within the transaction itself.
|
||||
val res = withRetry {
|
||||
database.transaction {
|
||||
val em = session.entityManagerFactory.createEntityManager()
|
||||
em.unwrap(Session::class.java).jdbcBatchSize = jdbcBatchSize
|
||||
|
||||
val toCommit = mutableListOf<CommitRequest>()
|
||||
val consumedStates = findAllConflicts(session, requests)
|
||||
val processedTxIds = mutableMapOf<SecureHash, InternalResult>()
|
||||
|
||||
val results = requests.map { request ->
|
||||
processRequest(session, request, consumedStates, processedTxIds, toCommit)
|
||||
}
|
||||
|
||||
logRequests(requests)
|
||||
commitRequests(session, toCommit)
|
||||
|
||||
results
|
||||
}
|
||||
}
|
||||
completeResponses(requests, res)
|
||||
} catch (e: Exception) {
|
||||
log.warn("Error processing commit requests", e)
|
||||
for (request in requests) {
|
||||
respondWithError(request, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun completeResponses(requests: List<CommitRequest>, results: List<InternalResult>): Int {
|
||||
val zippedResults = requests.zip(results)
|
||||
val successfulRequests = zippedResults
|
||||
.filter { it.second is InternalResult.Success }
|
||||
.map { it.first.txId }
|
||||
.distinct()
|
||||
val signature = if (successfulRequests.isNotEmpty())
|
||||
signBatch(successfulRequests)
|
||||
else null
|
||||
|
||||
var inputStateCount = 0
|
||||
for ((request, result) in zippedResults) {
|
||||
val resultToSet = when {
|
||||
result is InternalResult.Failure -> UniquenessProvider.Result.Failure(result.error)
|
||||
signature != null -> UniquenessProvider.Result.Success(signature.forParticipant(request.txId))
|
||||
else -> throw IllegalStateException("Signature is required but not found")
|
||||
}
|
||||
|
||||
request.future.set(resultToSet)
|
||||
inputStateCount += request.states.size
|
||||
}
|
||||
return inputStateCount
|
||||
}
|
||||
|
||||
private fun respondWithError(request: CommitRequest, exception: Exception) {
|
||||
if (exception is NotaryInternalException) {
|
||||
request.future.set(UniquenessProvider.Result.Failure(exception.error))
|
||||
} else {
|
||||
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
|
||||
}
|
||||
}
|
||||
}
|
18
node/src/main/kotlin/net/corda/notary/jpa/Schema.kt
Normal file
18
node/src/main/kotlin/net/corda/notary/jpa/Schema.kt
Normal file
@ -0,0 +1,18 @@
|
||||
package net.corda.notary.jpa
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
|
||||
object JPANotarySchema
|
||||
|
||||
object JPANotarySchemaV1 : MappedSchema(
|
||||
schemaFamily = JPANotarySchema.javaClass,
|
||||
version = 1,
|
||||
mappedTypes = listOf(
|
||||
JPAUniquenessProvider.CommittedState::class.java,
|
||||
JPAUniquenessProvider.Request::class.java,
|
||||
JPAUniquenessProvider.CommittedTransaction::class.java
|
||||
)
|
||||
) {
|
||||
override val migrationResource: String?
|
||||
get() = "node-notary.changelog-master"
|
||||
}
|
@ -9,5 +9,7 @@
|
||||
<include file="migration/node-notary.changelog-v2.xml"/>
|
||||
<include file="migration/node-notary.changelog-pkey.xml"/>
|
||||
<include file="migration/node-notary.changelog-committed-transactions-table.xml" />
|
||||
<include file="migration/node-notary.changelog-v3.xml" />
|
||||
<include file="migration/node-notary.changelog-worker-logging.xml" />
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -0,0 +1,48 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="create-notary-committed-transactions-table" logicalFilePath="migration/node-notary.changelog-committed-transactions-table.xml">
|
||||
<createTable tableName="node_notary_committed_txs">
|
||||
<column name="transaction_id" type="NVARCHAR(64)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</createTable>
|
||||
<addPrimaryKey columnNames="transaction_id" constraintName="node_notary_transactions_pkey" tableName="node_notary_committed_txs"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet id="notary-request-log-change-id-type-oracle" author="R3.Corda">
|
||||
<preConditions onFail="MARK_RAN">
|
||||
<dbms type="oracle"/>
|
||||
</preConditions>
|
||||
<!--
|
||||
For Oracle it's not possible to modify the data type for a column with existing values.
|
||||
So we create a new column with the right type, copy over the values, drop the original one and rename the new one.
|
||||
-->
|
||||
<addColumn tableName="node_notary_request_log">
|
||||
<column name="id_temp" type="NVARCHAR(76)"/>
|
||||
</addColumn>
|
||||
<!-- Copy old values from the table to the new column -->
|
||||
<sql>
|
||||
UPDATE node_notary_request_log SET id_temp = id
|
||||
</sql>
|
||||
<dropPrimaryKey tableName="node_notary_request_log" constraintName="node_notary_request_log_pkey"/>
|
||||
<dropColumn tableName="node_notary_request_log" columnName="id"/>
|
||||
<renameColumn tableName="node_notary_request_log" oldColumnName="id_temp" newColumnName="id"/>
|
||||
<addNotNullConstraint tableName="node_notary_request_log" columnName="id" columnDataType="NVARCHAR(76)"/>
|
||||
<addPrimaryKey columnNames="id" constraintName="node_notary_request_log_pkey" tableName="node_notary_request_log"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet id="notary-request-log-change-id-type-others" author="R3.Corda">
|
||||
<preConditions onFail="MARK_RAN">
|
||||
<not>
|
||||
<dbms type="oracle"/>
|
||||
</not>
|
||||
</preConditions>
|
||||
<dropPrimaryKey tableName="node_notary_request_log" constraintName="node_notary_request_log_pkey"/>
|
||||
<modifyDataType tableName="node_notary_request_log" columnName="id" newDataType="NVARCHAR(76) NOT NULL"/>
|
||||
<addPrimaryKey columnNames="id" constraintName="node_notary_request_log_pkey" tableName="node_notary_request_log"/>
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
@ -0,0 +1,14 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="worker-logging">
|
||||
<addColumn tableName="node_notary_request_log">
|
||||
<column name="worker_node_x500_name" type="NVARCHAR(255)">
|
||||
<constraints nullable="true"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
@ -4,6 +4,7 @@ import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.MerkleTree
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
@ -21,9 +22,13 @@ import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.notary.common.BatchSignature
|
||||
import net.corda.notary.experimental.raft.RaftConfig
|
||||
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
|
||||
import net.corda.notary.experimental.raft.RaftUniquenessProvider
|
||||
import net.corda.notary.jpa.JPANotaryConfiguration
|
||||
import net.corda.notary.jpa.JPANotarySchemaV1
|
||||
import net.corda.notary.jpa.JPAUniquenessProvider
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.generateStateRef
|
||||
@ -52,7 +57,7 @@ class UniquenessProviderTests(
|
||||
@JvmStatic
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
fun data(): Collection<UniquenessProviderFactory> = listOf(
|
||||
PersistentUniquenessProviderFactory(),
|
||||
JPAUniquenessProviderFactory(),
|
||||
RaftUniquenessProviderFactory()
|
||||
)
|
||||
}
|
||||
@ -599,20 +604,6 @@ interface UniquenessProviderFactory {
|
||||
fun cleanUp() {}
|
||||
}
|
||||
|
||||
class PersistentUniquenessProviderFactory : UniquenessProviderFactory {
|
||||
private var database: CordaPersistence? = null
|
||||
|
||||
override fun create(clock: Clock): UniquenessProvider {
|
||||
database?.close()
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(NodeNotarySchemaV1)))
|
||||
return PersistentUniquenessProvider(clock, database!!, TestingNamedCacheFactory(), ::signSingle)
|
||||
}
|
||||
|
||||
override fun cleanUp() {
|
||||
database?.close()
|
||||
}
|
||||
}
|
||||
|
||||
class RaftUniquenessProviderFactory : UniquenessProviderFactory {
|
||||
private var database: CordaPersistence? = null
|
||||
private var provider: RaftUniquenessProvider? = null
|
||||
@ -645,6 +636,36 @@ class RaftUniquenessProviderFactory : UniquenessProviderFactory {
|
||||
}
|
||||
}
|
||||
|
||||
fun signBatch(it: Iterable<SecureHash>): BatchSignature {
|
||||
val root = MerkleTree.getMerkleTree(it.map { it.sha256() })
|
||||
|
||||
val signableMetadata = SignatureMetadata(4, Crypto.findSignatureScheme(pubKey).schemeNumberID)
|
||||
val signature = keyService.sign(SignableData(root.hash, signableMetadata), pubKey)
|
||||
return BatchSignature(signature, root)
|
||||
}
|
||||
|
||||
class JPAUniquenessProviderFactory : UniquenessProviderFactory {
|
||||
private var database: CordaPersistence? = null
|
||||
private val notaryConfig = JPANotaryConfiguration(maxInputStates = 10)
|
||||
private val notaryWorkerName = CordaX500Name.parse("CN=NotaryWorker, O=Corda, L=London, C=GB")
|
||||
|
||||
override fun create(clock: Clock): UniquenessProvider {
|
||||
database?.close()
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(JPANotarySchemaV1)))
|
||||
return JPAUniquenessProvider(
|
||||
clock,
|
||||
database!!,
|
||||
notaryConfig,
|
||||
notaryWorkerName,
|
||||
::signBatch
|
||||
)
|
||||
}
|
||||
|
||||
override fun cleanUp() {
|
||||
database?.close()
|
||||
}
|
||||
}
|
||||
|
||||
var ourKeyPair: KeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
val keyService = MockKeyManagementService(makeTestIdentityService(), ourKeyPair)
|
||||
val pubKey = keyService.freshKey()
|
||||
|
Loading…
Reference in New Issue
Block a user