From 52cbe04b8c439e7b2624e2dda7eca3853b61ffba Mon Sep 17 00:00:00 2001 From: Stefan Iliev <46542846+StefanIliev545@users.noreply.github.com> Date: Tue, 28 Jul 2020 15:50:19 +0100 Subject: [PATCH] EG-2375 - batching notary open sourcing. (#6507) --- .../vault/VaultObserverExceptionTest.kt | 6 +- .../net/corda/node/internal/AbstractNode.kt | 5 - .../node/internal/cordapp/VirtualCordapps.kt | 15 +- .../node/services/schema/NodeSchemaService.kt | 1 - .../transactions/SimpleNotaryService.kt | 49 --- .../net/corda/node/utilities/NotaryLoader.kt | 6 +- .../net/corda/notary/common/BatchSigning.kt | 54 +++ .../notary/jpa/JPANotaryConfiguration.kt | 9 + .../net/corda/notary/jpa/JPANotaryService.kt | 55 +++ .../corda/notary/jpa/JPAUniquenessProvider.kt | 408 ++++++++++++++++++ .../kotlin/net/corda/notary/jpa/Schema.kt | 18 + .../node-notary.changelog-master.xml | 2 + .../migration/node-notary.changelog-v3.xml | 48 +++ .../node-notary.changelog-worker-logging.xml | 14 + .../transactions/UniquenessProviderTests.kt | 51 ++- 15 files changed, 658 insertions(+), 83 deletions(-) delete mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt create mode 100644 node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt create mode 100644 node/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt create mode 100644 node/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt create mode 100644 node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt create mode 100644 node/src/main/kotlin/net/corda/notary/jpa/Schema.kt create mode 100644 node/src/main/resources/migration/node-notary.changelog-v3.xml create mode 100644 node/src/main/resources/migration/node-notary.changelog-worker-logging.xml diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index 601134338f..d1d4cd7b23 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -25,7 +25,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.Permissions import net.corda.node.services.statemachine.StaffedFlowHospital -import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.notary.jpa.JPAUniquenessProvider import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -856,8 +856,8 @@ class VaultObserverExceptionTest { override fun call(): List { return serviceHub.withEntityManager { val criteriaQuery = this.criteriaBuilder.createQuery(String::class.java) - val root = criteriaQuery.from(PersistentUniquenessProvider.CommittedTransaction::class.java) - criteriaQuery.select(root.get(PersistentUniquenessProvider.CommittedTransaction::transactionId.name)) + val root = criteriaQuery.from(JPAUniquenessProvider.CommittedTransaction::class.java) + criteriaQuery.select(root.get(JPAUniquenessProvider.CommittedTransaction::transactionId.name)) val query = this.createQuery(criteriaQuery) query.resultList } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 59b6b4fca7..2c1c9563fa 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -133,7 +133,6 @@ import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.BasicVerifierFactoryService import net.corda.node.services.transactions.DeterministicVerifierFactoryService import net.corda.node.services.transactions.InMemoryTransactionVerifierService -import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.services.transactions.VerifierFactoryService import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.vault.NodeVaultService @@ -792,10 +791,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, ) } - private fun isRunningSimpleNotaryService(configuration: NodeConfiguration): Boolean { - return configuration.notary != null && configuration.notary?.className == SimpleNotaryService::class.java.name - } - private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause) private fun installCordaServices() { diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt index 5ad5add351..72c9b0a90a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt @@ -6,12 +6,12 @@ import net.corda.core.flows.ContractUpgradeFlow import net.corda.core.internal.cordapp.CordappImpl import net.corda.core.internal.location import net.corda.node.VersionInfo -import net.corda.node.services.transactions.NodeNotarySchemaV1 -import net.corda.node.services.transactions.SimpleNotaryService import net.corda.notary.experimental.bftsmart.BFTSmartNotarySchemaV1 import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService import net.corda.notary.experimental.raft.RaftNotarySchemaV1 import net.corda.notary.experimental.raft.RaftNotaryService +import net.corda.notary.jpa.JPANotarySchemaV1 +import net.corda.notary.jpa.JPANotaryService internal object VirtualCordapp { /** A list of the core RPC flows present in Corda */ @@ -46,7 +46,7 @@ internal object VirtualCordapp { } /** A Cordapp for the built-in notary service implementation. */ - fun generateSimpleNotary(versionInfo: VersionInfo): CordappImpl { + fun generateJPANotary(versionInfo: VersionInfo): CordappImpl { return CordappImpl( contractClassNames = listOf(), initiatedFlows = listOf(), @@ -57,15 +57,16 @@ internal object VirtualCordapp { serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), checkpointCustomSerializers = listOf(), - customSchemas = setOf(NodeNotarySchemaV1), + customSchemas = setOf(JPANotarySchemaV1), info = Cordapp.Info.Default("corda-notary", versionInfo.vendor, versionInfo.releaseVersion, "Open Source (Apache 2)"), allFlows = listOf(), - jarPath = SimpleNotaryService::class.java.location, + jarPath = JPANotaryService::class.java.location, jarHash = SecureHash.allOnesHash, minimumPlatformVersion = versionInfo.platformVersion, targetPlatformVersion = versionInfo.platformVersion, - notaryService = SimpleNotaryService::class.java, - isLoaded = false + notaryService = JPANotaryService::class.java, + isLoaded = false, + isVirtual = true ) } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index d38c6371ef..3244385d04 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -66,7 +66,6 @@ class NodeSchemaService(private val extraSchemas: Set = emptySet() // when mapped schemas from the finance module are present, they are considered as internal ones schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" || - schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1" || schema::class.qualifiedName?.startsWith("net.corda.notary.") ?: false } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt deleted file mode 100644 index 055cadab84..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ /dev/null @@ -1,49 +0,0 @@ -package net.corda.node.services.transactions - -import net.corda.core.flows.FlowSession -import net.corda.core.internal.notary.SinglePartyNotaryService -import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.core.schemas.MappedSchema -import net.corda.core.utilities.seconds -import net.corda.node.services.api.ServiceHubInternal -import java.security.PublicKey - -/** An embedded notary service that uses the node's database to store committed states. */ -class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { - private val notaryConfig = services.configuration.notary - ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") - - init { - val mode = if (notaryConfig.validating) "validating" else "non-validating" - log.info("Starting notary in $mode mode") - } - - override val uniquenessProvider = PersistentUniquenessProvider( - services.clock, - services.database, - services.cacheFactory, - ::signTransaction) - - override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { - return if (notaryConfig.validating) { - ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) - } else { - NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) - } - } - - override fun start() {} - override fun stop() {} -} - -// Entities used by a Notary -object NodeNotarySchema - -object NodeNotarySchemaV1 : MappedSchema(schemaFamily = NodeNotarySchema.javaClass, version = 1, - mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java, - PersistentUniquenessProvider.Request::class.java, - PersistentUniquenessProvider.CommittedState::class.java, - PersistentUniquenessProvider.CommittedTransaction::class.java - )) { - override val migrationResource = "node-notary.changelog-master" -} diff --git a/node/src/main/kotlin/net/corda/node/utilities/NotaryLoader.kt b/node/src/main/kotlin/net/corda/node/utilities/NotaryLoader.kt index 749440431f..75fa2efb24 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/NotaryLoader.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/NotaryLoader.kt @@ -9,10 +9,10 @@ import net.corda.node.VersionInfo import net.corda.node.internal.cordapp.VirtualCordapp import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.NotaryConfig -import net.corda.node.services.transactions.SimpleNotaryService import net.corda.nodeapi.internal.cordapp.CordappLoader import net.corda.notary.experimental.bftsmart.BFTSmartNotaryService import net.corda.notary.experimental.raft.RaftNotaryService +import net.corda.notary.jpa.JPANotaryService import java.lang.reflect.InvocationTargetException import java.security.PublicKey @@ -44,8 +44,8 @@ class NotaryLoader( RaftNotaryService::class.java } else -> { - builtInNotary = VirtualCordapp.generateSimpleNotary(versionInfo) - SimpleNotaryService::class.java + builtInNotary = VirtualCordapp.generateJPANotary(versionInfo) + JPANotaryService::class.java } } } else { diff --git a/node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt b/node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt new file mode 100644 index 0000000000..4fc3bb4e25 --- /dev/null +++ b/node/src/main/kotlin/net/corda/notary/common/BatchSigning.kt @@ -0,0 +1,54 @@ +package net.corda.notary.common + +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.MerkleTree +import net.corda.core.crypto.PartialMerkleTree +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignableData +import net.corda.core.crypto.SignatureMetadata +import net.corda.core.crypto.TransactionSignature +import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotaryError +import net.corda.core.node.ServiceHub +import java.security.PublicKey + +typealias BatchSigningFunction = (Iterable) -> BatchSignature + +/** Generates a signature over the bach of [txIds]. */ +fun signBatch( + txIds: Iterable, + notaryIdentityKey: PublicKey, + services: ServiceHub +): BatchSignature { + val merkleTree = MerkleTree.getMerkleTree(txIds.map { it.sha256() }) + val merkleTreeRoot = merkleTree.hash + val signableData = SignableData( + merkleTreeRoot, + SignatureMetadata( + services.myInfo.platformVersion, + Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID + ) + ) + val sig = services.keyManagementService.sign(signableData, notaryIdentityKey) + return BatchSignature(sig, merkleTree) +} + +/** The outcome of just committing a transaction. */ +sealed class InternalResult { + object Success : InternalResult() + data class Failure(val error: NotaryError) : InternalResult() +} + +data class BatchSignature( + val rootSignature: TransactionSignature, + val fullMerkleTree: MerkleTree) { + /** Extracts a signature with a partial Merkle tree for the specified leaf in the batch signature. */ + fun forParticipant(txId: SecureHash): TransactionSignature { + return TransactionSignature( + rootSignature.bytes, + rootSignature.by, + rootSignature.signatureMetadata, + PartialMerkleTree.build(fullMerkleTree, listOf(txId.sha256())) + ) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt b/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt new file mode 100644 index 0000000000..39c8f4dff9 --- /dev/null +++ b/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryConfiguration.kt @@ -0,0 +1,9 @@ +package net.corda.notary.jpa + +data class JPANotaryConfiguration( + val batchSize: Int = 32, + val batchTimeoutMs: Long = 200L, + val maxInputStates: Int = 2000, + val maxDBTransactionRetryCount: Int = 10, + val backOffBaseMs: Long = 20L +) diff --git a/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt b/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt new file mode 100644 index 0000000000..6db10a8333 --- /dev/null +++ b/node/src/main/kotlin/net/corda/notary/jpa/JPANotaryService.kt @@ -0,0 +1,55 @@ +package net.corda.notary.jpa + +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowSession +import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.internal.notary.SinglePartyNotaryService +import net.corda.core.utilities.seconds +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.transactions.NonValidatingNotaryFlow +import net.corda.node.services.transactions.ValidatingNotaryFlow +import net.corda.nodeapi.internal.config.parseAs +import net.corda.notary.common.signBatch +import java.security.PublicKey + +/** Notary service backed by a relational database. */ +class JPANotaryService( + override val services: ServiceHubInternal, + override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { + + private val notaryConfig = services.configuration.notary + ?: throw IllegalArgumentException("Failed to register ${this::class.java}: notary configuration not present") + + + @Suppress("TooGenericExceptionCaught") + override val uniquenessProvider = with(services) { + val jpaNotaryConfig = try { + notaryConfig.extraConfig?.parseAs() ?: JPANotaryConfiguration() + } catch (e: Exception) { + throw IllegalArgumentException("Failed to register ${JPANotaryService::class.java}: extra notary configuration parameters invalid") + } + JPAUniquenessProvider( + clock, + database, + jpaNotaryConfig, + configuration.myLegalName, + ::signTransactionBatch + ) + } + + private fun signTransactionBatch(txIds: Iterable) + = signBatch(txIds, notaryIdentityKey, services) + + override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { + return if (notaryConfig.validating) { + ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) + } else NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) + } + + override fun start() { + } + + override fun stop() { + uniquenessProvider.stop() + } +} diff --git a/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt new file mode 100644 index 0000000000..53ea1749fd --- /dev/null +++ b/node/src/main/kotlin/net/corda/notary/jpa/JPAUniquenessProvider.kt @@ -0,0 +1,408 @@ +package net.corda.notary.jpa + +import com.google.common.collect.Queues +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError +import net.corda.core.flows.StateConsumptionDetails +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.notary.common.BatchSigningFunction +import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.internal.notary.isConsumedByTheSameTx +import net.corda.core.internal.notary.validateTimeWindow +import net.corda.core.schemas.PersistentStateRef +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX +import net.corda.notary.common.InternalResult +import net.corda.serialization.internal.CordaSerializationEncoding +import org.hibernate.Session +import java.sql.SQLException +import java.time.Clock +import java.time.Instant +import java.util.* +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.ThreadSafe +import javax.persistence.Column +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.NamedQuery +import kotlin.concurrent.thread + +/** A JPA backed Uniqueness provider */ +@Suppress("MagicNumber") // database column length +@ThreadSafe +class JPAUniquenessProvider( + val clock: Clock, + val database: CordaPersistence, + val config: JPANotaryConfiguration = JPANotaryConfiguration(), + val notaryWorkerName: CordaX500Name, + val signBatch: BatchSigningFunction +) : UniquenessProvider, SingletonSerializeAsToken() { + + // This is the prefix of the ID in the request log table, to allow running multiple instances that access the + // same table. + private val instanceId = UUID.randomUUID() + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log") + @CordaSerializable + class Request( + @Id + @Column(nullable = true, length = 76) + var id: String? = null, + + @Column(name = "consuming_transaction_id", nullable = true, length = 64) + val consumingTxHash: String?, + + @Column(name = "requesting_party_name", nullable = true, length = 255) + var partyName: String?, + + @Lob + @Column(name = "request_signature", nullable = false) + val requestSignature: ByteArray, + + @Column(name = "request_timestamp", nullable = false) + var requestDate: Instant, + + @Column(name = "worker_node_x500_name", nullable = true, length = 255) + val workerNodeX500Name: String? + ) + + private data class CommitRequest( + val states: List, + val txId: SecureHash, + val callerIdentity: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?, + val references: List, + val future: OpenFuture, + val requestEntity: Request, + val committedStatesEntities: List) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") + @NamedQuery(name = "CommittedState.select", query = "SELECT c from JPAUniquenessProvider\$CommittedState c WHERE c.id in :ids") + class CommittedState( + @EmbeddedId + val id: PersistentStateRef, + @Column(name = "consuming_transaction_id", nullable = false, length = 64) + val consumingTxHash: String) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_txs") + class CommittedTransaction( + @Id + @Column(name = "transaction_id", nullable = false, length = 64) + val transactionId: String + ) + + private val requestQueue = LinkedBlockingQueue(requestQueueSize) + + /** A requestEntity processor thread. */ + private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) { + try { + val buffer = LinkedList() + while (!Thread.interrupted()) { + val drainedSize = Queues.drain(requestQueue, buffer, config.batchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS) + if (drainedSize == 0) continue + processRequests(buffer) + buffer.clear() + } + } catch (_: InterruptedException) { + log.debug { "Process interrupted."} + } + log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." } + } + + fun stop() { + processorThread.interrupt() + } + + companion object { + private const val requestQueueSize = 100_000 + private const val jdbcBatchSize = 100_000 + private val log = contextLogger() + + fun encodeStateRef(s: StateRef): PersistentStateRef { + return PersistentStateRef(s.txhash.toString(), s.index) + } + + fun decodeStateRef(s: PersistentStateRef): StateRef { + return StateRef(txhash = SecureHash.parse(s.txId), index = s.index) + } + } + + /** + * Generates and adds a [CommitRequest] to the requestEntity queue. If the requestEntity queue is full, this method will block + * until space is available. + * + * Returns a future that will complete once the requestEntity is processed, containing the commit [Result]. + */ + override fun commit( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?, + references: List + ): CordaFuture { + val future = openFuture() + val requestEntities = Request(consumingTxHash = txId.toString(), + partyName = callerIdentity.name.toString(), + requestSignature = requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY)).bytes, + requestDate = clock.instant(), + workerNodeX500Name = notaryWorkerName.toString()) + val stateEntities = states.map { + CommittedState( + encodeStateRef(it), + txId.toString() + ) + } + val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future, requestEntities, stateEntities) + + requestQueue.put(request) + + return future + } + + // Safe up to 100k requests per second. + private var nextRequestId = System.currentTimeMillis() * 100 + + private fun logRequests(requests: List) { + database.transaction { + for (request in requests) { + request.requestEntity.id = "$instanceId:${(nextRequestId++).toString(16)}" + session.persist(request.requestEntity) + } + } + } + + private fun commitRequests(session: Session, requests: List) { + for (request in requests) { + for (cs in request.committedStatesEntities) { + session.persist(cs) + } + session.persist(CommittedTransaction(request.txId.toString())) + } + } + + private fun findAlreadyCommitted(session: Session, states: List, references: List): Map { + val persistentStateRefs = (states + references).map { encodeStateRef(it) }.toSet() + val committedStates = mutableListOf() + + for (idsBatch in persistentStateRefs.chunked(config.maxInputStates)) { + @Suppress("UNCHECKED_CAST") + val existing = session + .createNamedQuery("CommittedState.select") + .setParameter("ids", idsBatch) + .resultList as List + committedStates.addAll(existing) + } + + return committedStates.map { + val stateRef = StateRef(txhash = SecureHash.parse(it.id.txId), index = it.id.index) + val consumingTxId = SecureHash.parse(it.consumingTxHash) + if (stateRef in references) { + stateRef to StateConsumptionDetails(consumingTxId.sha256(), type = StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + } else { + stateRef to StateConsumptionDetails(consumingTxId.sha256()) + } + }.toMap() + } + + private fun withRetry(block: () -> T): T { + var retryCount = 0 + var backOff = config.backOffBaseMs + var exceptionCaught: SQLException? = null + while (retryCount <= config.maxDBTransactionRetryCount) { + try { + val res = block() + return res + } catch (e: SQLException) { + retryCount++ + Thread.sleep(backOff) + backOff *= 2 + exceptionCaught = e + } + } + throw exceptionCaught!! + } + + private fun findAllConflicts(session: Session, requests: List): MutableMap { + log.info("Processing notarization requests with ${requests.sumBy { it.states.size }} input states and ${requests.sumBy { it.references.size }} references") + + val allStates = requests.flatMap { it.states } + val allReferences = requests.flatMap { it.references } + return findAlreadyCommitted(session, allStates, allReferences).toMutableMap() + } + + private fun processRequest( + session: Session, + request: CommitRequest, + consumedStates: MutableMap, + processedTxIds: MutableMap, + toCommit: MutableList + ): InternalResult { + val conflicts = (request.states + request.references).mapNotNull { + if (consumedStates.containsKey(it)) it to consumedStates[it]!! + else null + }.toMap() + + return if (conflicts.isNotEmpty()) { + handleStateConflicts(request, conflicts, session) + } else { + handleNoStateConflicts(request, toCommit, consumedStates, processedTxIds, session) + } + } + + /** + * Process the [request] given there are conflicting states already present in the DB or current batch. + * + * To ensure idempotency, if the request's transaction matches a previously consumed transaction then the + * same result (success) can be returned without committing it to the DB. Failure is only returned in the + * case where the request is not a duplicate of a previously processed request and hence it is a genuine + * double spend attempt. + */ + private fun handleStateConflicts( + request: CommitRequest, + stateConflicts: Map, + session: Session + ): InternalResult { + return when { + isConsumedByTheSameTx(request.txId.sha256(), stateConflicts) -> { + InternalResult.Success + } + request.states.isEmpty() && isPreviouslyNotarised(session, request.txId) -> { + InternalResult.Success + } + else -> { + InternalResult.Failure(NotaryError.Conflict(request.txId, stateConflicts)) + } + } + } + + /** + * Process the [request] given there are no conflicting states already present in the DB or current batch. + * + * This method performs time window validation and adds the request to the commit list if applicable. + * It also checks the [processedTxIds] map to ensure that any time-window only duplicates within the batch + * are only committed once. + */ + private fun handleNoStateConflicts( + request: CommitRequest, + toCommit: MutableList, + consumedStates: MutableMap, + processedTxIds: MutableMap, + session: Session + ): InternalResult { + return when { + request.states.isEmpty() && isPreviouslyNotarised(session, request.txId) -> { + InternalResult.Success + } + processedTxIds.containsKey(request.txId) -> { + processedTxIds[request.txId]!! + } + else -> { + val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) + val internalResult = if (outsideTimeWindowError != null) { + InternalResult.Failure(outsideTimeWindowError) + } else { + // Mark states as consumed to capture conflicting transactions in the same batch + request.states.forEach { + consumedStates[it] = StateConsumptionDetails(request.txId.sha256()) + } + toCommit.add(request) + InternalResult.Success + } + // Store transaction result to capture conflicting time-window only transactions in the same batch + processedTxIds[request.txId] = internalResult + internalResult + } + } + } + + private fun isPreviouslyNotarised(session: Session, txId: SecureHash): Boolean { + return session.find(CommittedTransaction::class.java, txId.toString()) != null + } + + @Suppress("TooGenericExceptionCaught") + private fun processRequests(requests: List) { + try { + // Note that there is an additional retry mechanism within the transaction itself. + val res = withRetry { + database.transaction { + val em = session.entityManagerFactory.createEntityManager() + em.unwrap(Session::class.java).jdbcBatchSize = jdbcBatchSize + + val toCommit = mutableListOf() + val consumedStates = findAllConflicts(session, requests) + val processedTxIds = mutableMapOf() + + val results = requests.map { request -> + processRequest(session, request, consumedStates, processedTxIds, toCommit) + } + + logRequests(requests) + commitRequests(session, toCommit) + + results + } + } + completeResponses(requests, res) + } catch (e: Exception) { + log.warn("Error processing commit requests", e) + for (request in requests) { + respondWithError(request, e) + } + } + } + + private fun completeResponses(requests: List, results: List): Int { + val zippedResults = requests.zip(results) + val successfulRequests = zippedResults + .filter { it.second is InternalResult.Success } + .map { it.first.txId } + .distinct() + val signature = if (successfulRequests.isNotEmpty()) + signBatch(successfulRequests) + else null + + var inputStateCount = 0 + for ((request, result) in zippedResults) { + val resultToSet = when { + result is InternalResult.Failure -> UniquenessProvider.Result.Failure(result.error) + signature != null -> UniquenessProvider.Result.Success(signature.forParticipant(request.txId)) + else -> throw IllegalStateException("Signature is required but not found") + } + + request.future.set(resultToSet) + inputStateCount += request.states.size + } + return inputStateCount + } + + private fun respondWithError(request: CommitRequest, exception: Exception) { + if (exception is NotaryInternalException) { + request.future.set(UniquenessProvider.Result.Failure(exception.error)) + } else { + request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) + } + } +} diff --git a/node/src/main/kotlin/net/corda/notary/jpa/Schema.kt b/node/src/main/kotlin/net/corda/notary/jpa/Schema.kt new file mode 100644 index 0000000000..209e894e48 --- /dev/null +++ b/node/src/main/kotlin/net/corda/notary/jpa/Schema.kt @@ -0,0 +1,18 @@ +package net.corda.notary.jpa + +import net.corda.core.schemas.MappedSchema + +object JPANotarySchema + +object JPANotarySchemaV1 : MappedSchema( + schemaFamily = JPANotarySchema.javaClass, + version = 1, + mappedTypes = listOf( + JPAUniquenessProvider.CommittedState::class.java, + JPAUniquenessProvider.Request::class.java, + JPAUniquenessProvider.CommittedTransaction::class.java + ) +) { + override val migrationResource: String? + get() = "node-notary.changelog-master" +} diff --git a/node/src/main/resources/migration/node-notary.changelog-master.xml b/node/src/main/resources/migration/node-notary.changelog-master.xml index d8a5a61b8c..506cd551ad 100644 --- a/node/src/main/resources/migration/node-notary.changelog-master.xml +++ b/node/src/main/resources/migration/node-notary.changelog-master.xml @@ -9,5 +9,7 @@ + + diff --git a/node/src/main/resources/migration/node-notary.changelog-v3.xml b/node/src/main/resources/migration/node-notary.changelog-v3.xml new file mode 100644 index 0000000000..596d0876d4 --- /dev/null +++ b/node/src/main/resources/migration/node-notary.changelog-v3.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + + UPDATE node_notary_request_log SET id_temp = id + + + + + + + + + + + + + + + + + + + + diff --git a/node/src/main/resources/migration/node-notary.changelog-worker-logging.xml b/node/src/main/resources/migration/node-notary.changelog-worker-logging.xml new file mode 100644 index 0000000000..096e6478f7 --- /dev/null +++ b/node/src/main/resources/migration/node-notary.changelog-worker-logging.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt index 07679e0f77..29d347b427 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt @@ -4,6 +4,7 @@ import com.codahale.metrics.MetricRegistry import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.Crypto import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.MerkleTree import net.corda.core.crypto.NullKeys import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignableData @@ -21,9 +22,13 @@ import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.notary.common.BatchSignature import net.corda.notary.experimental.raft.RaftConfig import net.corda.notary.experimental.raft.RaftNotarySchemaV1 import net.corda.notary.experimental.raft.RaftUniquenessProvider +import net.corda.notary.jpa.JPANotaryConfiguration +import net.corda.notary.jpa.JPANotarySchemaV1 +import net.corda.notary.jpa.JPAUniquenessProvider import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.core.generateStateRef @@ -52,7 +57,7 @@ class UniquenessProviderTests( @JvmStatic @Parameterized.Parameters(name = "{0}") fun data(): Collection = listOf( - PersistentUniquenessProviderFactory(), + JPAUniquenessProviderFactory(), RaftUniquenessProviderFactory() ) } @@ -599,20 +604,6 @@ interface UniquenessProviderFactory { fun cleanUp() {} } -class PersistentUniquenessProviderFactory : UniquenessProviderFactory { - private var database: CordaPersistence? = null - - override fun create(clock: Clock): UniquenessProvider { - database?.close() - database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(NodeNotarySchemaV1))) - return PersistentUniquenessProvider(clock, database!!, TestingNamedCacheFactory(), ::signSingle) - } - - override fun cleanUp() { - database?.close() - } -} - class RaftUniquenessProviderFactory : UniquenessProviderFactory { private var database: CordaPersistence? = null private var provider: RaftUniquenessProvider? = null @@ -645,6 +636,36 @@ class RaftUniquenessProviderFactory : UniquenessProviderFactory { } } +fun signBatch(it: Iterable): BatchSignature { + val root = MerkleTree.getMerkleTree(it.map { it.sha256() }) + + val signableMetadata = SignatureMetadata(4, Crypto.findSignatureScheme(pubKey).schemeNumberID) + val signature = keyService.sign(SignableData(root.hash, signableMetadata), pubKey) + return BatchSignature(signature, root) +} + +class JPAUniquenessProviderFactory : UniquenessProviderFactory { + private var database: CordaPersistence? = null + private val notaryConfig = JPANotaryConfiguration(maxInputStates = 10) + private val notaryWorkerName = CordaX500Name.parse("CN=NotaryWorker, O=Corda, L=London, C=GB") + + override fun create(clock: Clock): UniquenessProvider { + database?.close() + database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(JPANotarySchemaV1))) + return JPAUniquenessProvider( + clock, + database!!, + notaryConfig, + notaryWorkerName, + ::signBatch + ) + } + + override fun cleanUp() { + database?.close() + } +} + var ourKeyPair: KeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) val keyService = MockKeyManagementService(makeTestIdentityService(), ourKeyPair) val pubKey = keyService.freshKey()