From 5fe365d483296b86a597b36da8cc3ca14a0d203d Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Thu, 24 Aug 2017 11:31:41 +0100 Subject: [PATCH] rewrite BftNonValidatingNotarService to use Hibernate --- .../node/services/schema/NodeSchemaService.kt | 5 +- .../BFTNonValidatingNotaryService.kt | 50 ++++++++-- .../node/services/transactions/BFTSMaRt.kt | 13 +-- .../PersistentUniquenessProvider.kt | 92 ++++++++++--------- 4 files changed, 101 insertions(+), 59 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index e5bcb4850d..331ef4461d 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -17,6 +17,7 @@ import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.vault.VaultSchemaV1 @@ -40,6 +41,7 @@ class NodeSchemaService(customSchemas: Set = emptySet()) : SchemaS DBTransactionMappingStorage.DBTransactionMapping::class.java, PersistentKeyManagementService.PersistentKey::class.java, PersistentUniquenessProvider.PersistentUniqueness::class.java, + PersistentUniquenessProvider.PersistentNotaryCommit::class.java, NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java, PersistentNetworkMapService.NetworkNode::class.java, @@ -47,7 +49,8 @@ class NodeSchemaService(customSchemas: Set = emptySet()) : SchemaS NodeMessagingClient.ProcessedMessage::class.java, NodeMessagingClient.RetryMessage::class.java, NodeAttachmentService.DBAttachment::class.java, - RaftUniquenessProvider.RaftState::class.java + RaftUniquenessProvider.RaftState::class.java, + BFTNonValidatingNotaryService.PersistedCommittedState::class.java )) // Required schemas are those used by internal Corda services diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index b73868686d..feb4e83e55 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -2,15 +2,15 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.SettableFuture -import net.corda.core.crypto.Crypto -import net.corda.core.crypto.DigitalSignature -import net.corda.core.crypto.SignableData -import net.corda.core.crypto.SignatureMetadata +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.* import net.corda.core.flows.FlowLogic import net.corda.core.flows.NotaryException import net.corda.core.identity.Party import net.corda.core.node.services.NotaryService import net.corda.core.node.services.TimeWindowChecker +import net.corda.core.node.services.UniquenessProvider +import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.transactions.FilteredTransaction @@ -19,6 +19,10 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.core.utilities.unwrap import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.node.utilities.NODE_DATABASE_PREFIX +import org.bouncycastle.asn1.x500.X500Name +import javax.persistence.Entity import kotlin.concurrent.thread /** @@ -49,7 +53,7 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, c thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { configHandle.use { val timeWindowChecker = TimeWindowChecker(services.clock) - val replica = Replica(it, replicaId, "bft_smart_notary_committed_states", services, timeWindowChecker) + val replica = Replica(it, replicaId, { createMap() }, services, timeWindowChecker) replicaHolder.set(replica) log.info("BFT SMaRt replica $replicaId is running.") } @@ -88,11 +92,43 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, c } } + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}bft_smart_notary_committed_states") + class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty) + : PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party) + + fun createMap(): AppendOnlyPersistentMap = + AppendOnlyPersistentMap( + toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, + fromPersistentEntity = { + //TODO null check will become obsolete after making DB/JPA columns not nullable + var txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + var index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") + Pair(StateRef(txhash = SecureHash.parse(txId), index = index), + UniquenessProvider.ConsumingTx( + id = SecureHash.parse(it.consumingTxHash), + inputIndex = it.consumingIndex, + requestingParty = Party( + name = X500Name(it.party.name), + owningKey = parsePublicKeyBase58(it.party.owningKey)))) + }, + toPersistentEntity = { (txHash, index) : StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx -> + PersistedCommittedState( + id = PersistentStateRef(txHash.toString(), index), + consumingTxHash = id.toString(), + consumingIndex = inputIndex, + party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(), + requestingParty.owningKey.toBase58String()) + ) + }, + persistentEntityClass = PersistedCommittedState::class.java + ) + private class Replica(config: BFTSMaRtConfig, replicaId: Int, - tableName: String, + createMap: () -> AppendOnlyPersistentMap, services: ServiceHubInternal, - timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, tableName, services, timeWindowChecker) { + timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, createMap, services, timeWindowChecker) { override fun executeCommand(command: ByteArray): ByteArray { val request = command.deserialize() diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index bd8c2069b7..ba39a29689 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -22,6 +22,7 @@ import net.corda.core.internal.declaredField import net.corda.core.internal.toTypedArray import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.UniquenessProvider +import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize @@ -33,7 +34,7 @@ import net.corda.core.utilities.loggerFor import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Replica -import net.corda.node.utilities.JDBCHashMap +import net.corda.node.utilities.AppendOnlyPersistentMap import java.nio.file.Path import java.util.* @@ -172,7 +173,8 @@ object BFTSMaRt { */ abstract class Replica(config: BFTSMaRtConfig, replicaId: Int, - tableName: String, + createMap: () -> AppendOnlyPersistentMap, protected val services: ServiceHubInternal, private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() { companion object { @@ -191,9 +193,8 @@ object BFTSMaRt { } override fun getStateManager() = stateManagerOverride - // TODO: Use proper DB schema instead of JDBCHashMap. // Must be initialised before ServiceReplica is started - private val commitLog = services.database.transaction { JDBCHashMap(tableName) } + private val commitLog = services.database.transaction { createMap() } private val replica = run { config.waitUntilReplicaWillNotPrintStackTrace(replicaId) @Suppress("LeakingThis") @@ -229,7 +230,7 @@ object BFTSMaRt { log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } states.forEachIndexed { i, stateRef -> val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) - commitLog.put(stateRef, txInfo) + commitLog[stateRef] = txInfo } } else { log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" } @@ -261,7 +262,7 @@ object BFTSMaRt { // LinkedHashMap for deterministic serialisation val m = LinkedHashMap() services.database.transaction { - commitLog.forEach { m[it.key] = it.value } + commitLog.allPersisted().forEach { m[it.first] = it.second } } return m.serialize().bytes } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 820f3d5809..8c4d0f18e1 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -3,10 +3,12 @@ package net.corda.node.services.transactions import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.crypto.parsePublicKeyBase58 +import net.corda.core.crypto.toBase58String import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.node.services.UniquenessException import net.corda.core.node.services.UniquenessProvider +import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.loggerFor import net.corda.node.utilities.* @@ -20,41 +22,35 @@ import javax.persistence.* @ThreadSafe class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() { - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log") - class PersistentUniqueness ( + @MappedSuperclass + open class PersistentUniqueness ( + @EmbeddedId + var id: PersistentStateRef = PersistentStateRef(), - @EmbeddedId - var id: StateRef = StateRef(), + @Column(name = "consuming_transaction_id") + var consumingTxHash: String = "", - @Column(name = "consuming_transaction_id") - var consumingTxHash: String = "", + @Column(name = "consuming_input_index", length = 36) + var consumingIndex: Int = 0, - @Column(name = "consuming_input_index", length = 36) - var consumingIndex: Int = 0, + @Embedded + var party: PersistentParty = PersistentParty() + ) - @Embedded - var party: Party = Party() - ) { + @Embeddable + data class PersistentParty( + @Column(name = "requesting_party_name") + var name: String = "", - @Embeddable - data class StateRef ( - @Column(name = "transaction_id") - var txId: String = "", + @Column(name = "requesting_party_key", length = 255) + var owningKey: String = "" + ): Serializable - @Column(name = "output_index", length = 36) - var index: Int = 0 - ) : Serializable + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log") + class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty): + PersistentUniqueness(id, consumingTxHash, consumingIndex, party) - @Embeddable - data class Party ( - @Column(name = "requesting_party_name") - var name: String = "", - - @Column(name = "requesting_party_key", length = 255) - var owningKey: String = "" - ) : Serializable - } private class InnerState { val committedStates = createMap() @@ -65,26 +61,32 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok companion object { private val log = loggerFor() - fun createMap(): AppendOnlyPersistentMap { - return AppendOnlyPersistentMap( - toPersistentEntityKey = { PersistentUniqueness.StateRef(it.txhash.toString(), it.index) }, - fromPersistentEntity = { - Pair(StateRef(SecureHash.parse(it.id.txId), it.id.index), - UniquenessProvider.ConsumingTx(SecureHash.parse(it.consumingTxHash), it.consumingIndex, - Party(X500Name(it.party.name), parsePublicKeyBase58(it.party.owningKey)))) + fun createMap(): AppendOnlyPersistentMap = + AppendOnlyPersistentMap( + toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, + fromPersistentEntity = { + //TODO null check will become obsolete after making DB/JPA columns not nullable + var txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + var index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") + Pair(StateRef(txhash = SecureHash.parse(txId), index = index), + UniquenessProvider.ConsumingTx( + id = SecureHash.parse(it.consumingTxHash), + inputIndex = it.consumingIndex, + requestingParty = Party( + name = X500Name(it.party.name), + owningKey = parsePublicKeyBase58(it.party.owningKey)))) + }, + toPersistentEntity = { (txHash, index) : StateRef, (id, inputIndex, requestingParty) : UniquenessProvider.ConsumingTx -> + PersistentNotaryCommit( + id = PersistentStateRef(txHash.toString(), index), + consumingTxHash = id.toString(), + consumingIndex = inputIndex, + party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.toBase58String()) + ) }, - toPersistentEntity = { key: StateRef, value: UniquenessProvider.ConsumingTx -> - PersistentUniqueness().apply { - id = PersistentUniqueness.StateRef(key.txhash.toString(), key.index) - consumingTxHash = value.id.toString() - consumingIndex = value.inputIndex - party = PersistentUniqueness.Party(value.requestingParty.name.toString()) - } - }, - persistentEntityClass = PersistentUniqueness::class.java + persistentEntityClass = PersistentNotaryCommit::class.java ) } - } override fun commit(states: List, txId: SecureHash, callerIdentity: Party) {