rewrite BftNonValidatingNotarService to use Hibernate

This commit is contained in:
szymonsztuka 2017-08-24 11:31:41 +01:00 committed by GitHub
parent 65c5ce65a6
commit 5fe365d483
4 changed files with 101 additions and 59 deletions

View File

@ -17,6 +17,7 @@ import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.vault.VaultSchemaV1 import net.corda.node.services.vault.VaultSchemaV1
@ -40,6 +41,7 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
DBTransactionMappingStorage.DBTransactionMapping::class.java, DBTransactionMappingStorage.DBTransactionMapping::class.java,
PersistentKeyManagementService.PersistentKey::class.java, PersistentKeyManagementService.PersistentKey::class.java,
PersistentUniquenessProvider.PersistentUniqueness::class.java, PersistentUniquenessProvider.PersistentUniqueness::class.java,
PersistentUniquenessProvider.PersistentNotaryCommit::class.java,
NodeSchedulerService.PersistentScheduledState::class.java, NodeSchedulerService.PersistentScheduledState::class.java,
NodeAttachmentService.DBAttachment::class.java, NodeAttachmentService.DBAttachment::class.java,
PersistentNetworkMapService.NetworkNode::class.java, PersistentNetworkMapService.NetworkNode::class.java,
@ -47,7 +49,8 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
NodeMessagingClient.ProcessedMessage::class.java, NodeMessagingClient.ProcessedMessage::class.java,
NodeMessagingClient.RetryMessage::class.java, NodeMessagingClient.RetryMessage::class.java,
NodeAttachmentService.DBAttachment::class.java, NodeAttachmentService.DBAttachment::class.java,
RaftUniquenessProvider.RaftState::class.java RaftUniquenessProvider.RaftState::class.java,
BFTNonValidatingNotaryService.PersistedCommittedState::class.java
)) ))
// Required schemas are those used by internal Corda services // Required schemas are those used by internal Corda services

View File

@ -2,15 +2,15 @@ package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.Crypto import net.corda.core.contracts.StateRef
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.*
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotaryException import net.corda.core.flows.NotaryException
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.services.NotaryService import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.FilteredTransaction
@ -19,6 +19,10 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import org.bouncycastle.asn1.x500.X500Name
import javax.persistence.Entity
import kotlin.concurrent.thread import kotlin.concurrent.thread
/** /**
@ -49,7 +53,7 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, c
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use { configHandle.use {
val timeWindowChecker = TimeWindowChecker(services.clock) val timeWindowChecker = TimeWindowChecker(services.clock)
val replica = Replica(it, replicaId, "bft_smart_notary_committed_states", services, timeWindowChecker) val replica = Replica(it, replicaId, { createMap() }, services, timeWindowChecker)
replicaHolder.set(replica) replicaHolder.set(replica)
log.info("BFT SMaRt replica $replicaId is running.") log.info("BFT SMaRt replica $replicaId is running.")
} }
@ -88,11 +92,43 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, c
} }
} }
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}bft_smart_notary_committed_states")
class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty)
: PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef> =
AppendOnlyPersistentMap(
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
//TODO null check will become obsolete after making DB/JPA columns not nullable
var txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
var index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
UniquenessProvider.ConsumingTx(
id = SecureHash.parse(it.consumingTxHash),
inputIndex = it.consumingIndex,
requestingParty = Party(
name = X500Name(it.party.name),
owningKey = parsePublicKeyBase58(it.party.owningKey))))
},
toPersistentEntity = { (txHash, index) : StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx ->
PersistedCommittedState(
id = PersistentStateRef(txHash.toString(), index),
consumingTxHash = id.toString(),
consumingIndex = inputIndex,
party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(),
requestingParty.owningKey.toBase58String())
)
},
persistentEntityClass = PersistedCommittedState::class.java
)
private class Replica(config: BFTSMaRtConfig, private class Replica(config: BFTSMaRtConfig,
replicaId: Int, replicaId: Int,
tableName: String, createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
services: ServiceHubInternal, services: ServiceHubInternal,
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, tableName, services, timeWindowChecker) { timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, createMap, services, timeWindowChecker) {
override fun executeCommand(command: ByteArray): ByteArray { override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>() val request = command.deserialize<BFTSMaRt.CommitRequest>()

View File

@ -22,6 +22,7 @@ import net.corda.core.internal.declaredField
import net.corda.core.internal.toTypedArray import net.corda.core.internal.toTypedArray
import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -33,7 +34,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Client
import net.corda.node.services.transactions.BFTSMaRt.Replica import net.corda.node.services.transactions.BFTSMaRt.Replica
import net.corda.node.utilities.JDBCHashMap import net.corda.node.utilities.AppendOnlyPersistentMap
import java.nio.file.Path import java.nio.file.Path
import java.util.* import java.util.*
@ -172,7 +173,8 @@ object BFTSMaRt {
*/ */
abstract class Replica(config: BFTSMaRtConfig, abstract class Replica(config: BFTSMaRtConfig,
replicaId: Int, replicaId: Int,
tableName: String, createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
protected val services: ServiceHubInternal, protected val services: ServiceHubInternal,
private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() { private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
companion object { companion object {
@ -191,9 +193,8 @@ object BFTSMaRt {
} }
override fun getStateManager() = stateManagerOverride override fun getStateManager() = stateManagerOverride
// TODO: Use proper DB schema instead of JDBCHashMap.
// Must be initialised before ServiceReplica is started // Must be initialised before ServiceReplica is started
private val commitLog = services.database.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) } private val commitLog = services.database.transaction { createMap() }
private val replica = run { private val replica = run {
config.waitUntilReplicaWillNotPrintStackTrace(replicaId) config.waitUntilReplicaWillNotPrintStackTrace(replicaId)
@Suppress("LeakingThis") @Suppress("LeakingThis")
@ -229,7 +230,7 @@ object BFTSMaRt {
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
states.forEachIndexed { i, stateRef -> states.forEachIndexed { i, stateRef ->
val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
commitLog.put(stateRef, txInfo) commitLog[stateRef] = txInfo
} }
} else { } else {
log.debug { "Conflict detected the following inputs have already been committed: ${conflicts.keys.joinToString()}" } log.debug { "Conflict detected the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
@ -261,7 +262,7 @@ object BFTSMaRt {
// LinkedHashMap for deterministic serialisation // LinkedHashMap for deterministic serialisation
val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>() val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
services.database.transaction { services.database.transaction {
commitLog.forEach { m[it.key] = it.value } commitLog.allPersisted().forEach { m[it.first] = it.second }
} }
return m.serialize().bytes return m.serialize().bytes
} }

View File

@ -3,10 +3,12 @@ package net.corda.node.services.transactions
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58 import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.node.services.UniquenessException import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.utilities.* import net.corda.node.utilities.*
@ -20,12 +22,10 @@ import javax.persistence.*
@ThreadSafe @ThreadSafe
class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() { class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() {
@Entity @MappedSuperclass
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log") open class PersistentUniqueness (
class PersistentUniqueness (
@EmbeddedId @EmbeddedId
var id: StateRef = StateRef(), var id: PersistentStateRef = PersistentStateRef(),
@Column(name = "consuming_transaction_id") @Column(name = "consuming_transaction_id")
var consumingTxHash: String = "", var consumingTxHash: String = "",
@ -34,27 +34,23 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
var consumingIndex: Int = 0, var consumingIndex: Int = 0,
@Embedded @Embedded
var party: Party = Party() var party: PersistentParty = PersistentParty()
) { )
@Embeddable @Embeddable
data class StateRef ( data class PersistentParty(
@Column(name = "transaction_id")
var txId: String = "",
@Column(name = "output_index", length = 36)
var index: Int = 0
) : Serializable
@Embeddable
data class Party (
@Column(name = "requesting_party_name") @Column(name = "requesting_party_name")
var name: String = "", var name: String = "",
@Column(name = "requesting_party_key", length = 255) @Column(name = "requesting_party_key", length = 255)
var owningKey: String = "" var owningKey: String = ""
) : Serializable ): Serializable
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log")
class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty):
PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
private class InnerState { private class InnerState {
val committedStates = createMap() val committedStates = createMap()
@ -65,25 +61,31 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
companion object { companion object {
private val log = loggerFor<PersistentUniquenessProvider>() private val log = loggerFor<PersistentUniquenessProvider>()
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentUniqueness, PersistentUniqueness.StateRef> { fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentNotaryCommit, PersistentStateRef> =
return AppendOnlyPersistentMap( AppendOnlyPersistentMap(
toPersistentEntityKey = { PersistentUniqueness.StateRef(it.txhash.toString(), it.index) }, toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = { fromPersistentEntity = {
Pair(StateRef(SecureHash.parse(it.id.txId), it.id.index), //TODO null check will become obsolete after making DB/JPA columns not nullable
UniquenessProvider.ConsumingTx(SecureHash.parse(it.consumingTxHash), it.consumingIndex, var txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
Party(X500Name(it.party.name), parsePublicKeyBase58(it.party.owningKey)))) var index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
UniquenessProvider.ConsumingTx(
id = SecureHash.parse(it.consumingTxHash),
inputIndex = it.consumingIndex,
requestingParty = Party(
name = X500Name(it.party.name),
owningKey = parsePublicKeyBase58(it.party.owningKey))))
}, },
toPersistentEntity = { key: StateRef, value: UniquenessProvider.ConsumingTx -> toPersistentEntity = { (txHash, index) : StateRef, (id, inputIndex, requestingParty) : UniquenessProvider.ConsumingTx ->
PersistentUniqueness().apply { PersistentNotaryCommit(
id = PersistentUniqueness.StateRef(key.txhash.toString(), key.index) id = PersistentStateRef(txHash.toString(), index),
consumingTxHash = value.id.toString() consumingTxHash = id.toString(),
consumingIndex = value.inputIndex consumingIndex = inputIndex,
party = PersistentUniqueness.Party(value.requestingParty.name.toString()) party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.toBase58String())
} )
}, },
persistentEntityClass = PersistentUniqueness::class.java persistentEntityClass = PersistentNotaryCommit::class.java
) )
}
} }
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) { override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {