mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-1208: Notary service should persist the notarisation request si… (#2823)
* CORDA-1208: Notary service should persist the notarisation request signature along with the committed input states. This required modifying the uniqueness provider interface to accept the signature in addition to input states. Until now the committed state log used to be stored as a map of (state reference -> (tranasction id, consuming party)). Adding the serialized signature would mean inflating each state entry by around 700 bytes, which would be grossly inefficient. Instead, two tables are now used: one for storing (state referece -> transaction id) map, and another for storing the notarisation request details (transaction id, consuming party, date, signature). * Update api - all of these changes are only related to custom notaries
This commit is contained in:
parent
15e4449b38
commit
e31d2b0cad
@ -1460,7 +1460,7 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net
|
||||
@co.paralleluniverse.fibers.Suspendable protected final void checkNotary(net.corda.core.identity.Party)
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.FlowSession getOtherSideSession()
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService()
|
||||
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.TransactionParts receiveAndVerifyTx()
|
||||
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload)
|
||||
##
|
||||
public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic
|
||||
public <init>(net.corda.core.flows.FlowSession)
|
||||
@ -2066,7 +2066,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService
|
||||
public <init>()
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
|
||||
@org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
|
||||
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
|
||||
@ -2082,7 +2082,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError()
|
||||
##
|
||||
public interface net.corda.core.node.services.UniquenessProvider
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object
|
||||
public <init>(Map)
|
||||
|
@ -127,7 +127,7 @@ class NotaryFlow {
|
||||
* It checks that the time-window command is valid (if present) and commits the input state, or returns a conflict
|
||||
* if any of the input states have been previously committed.
|
||||
*
|
||||
* Additional transaction validation logic can be added when implementing [receiveAndVerifyTx].
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
|
||||
@ -137,13 +137,15 @@ class NotaryFlow {
|
||||
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
|
||||
"We are not a notary on the network"
|
||||
}
|
||||
|
||||
val requestPayload = otherSideSession.receive<NotarisationPayload>().unwrap { it }
|
||||
var txId: SecureHash? = null
|
||||
try {
|
||||
val parts = receiveAndVerifyTx()
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
service.validateTimeWindow(parts.timestamp)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
@ -152,11 +154,10 @@ class NotaryFlow {
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement custom logic to receive the transaction to notarise, and perform verification based on validity and
|
||||
* privacy requirements.
|
||||
* Implement custom logic to perform transaction verification based on validity and privacy requirements.
|
||||
*/
|
||||
@Suspendable
|
||||
abstract fun receiveAndVerifyTx(): TransactionParts
|
||||
protected abstract fun validateRequest(requestPayload: NotarisationPayload): TransactionParts
|
||||
|
||||
/** Check if transaction is intended to be signed by this notary. */
|
||||
@Suspendable
|
||||
|
@ -397,10 +397,9 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade
|
||||
}
|
||||
|
||||
/** Verifies that the correct notarisation request was signed by the counterparty. */
|
||||
fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) {
|
||||
fun NotaryFlow.Service.validateRequestSignature(request: NotarisationRequest, signature: NotarisationRequestSignature) {
|
||||
val requestingParty = otherSideSession.counterparty
|
||||
request.verifySignature(signature, requestingParty)
|
||||
// TODO: persist the signature for traceability. Do we need to persist the request as well?
|
||||
}
|
||||
|
||||
/** Creates a signature over the notarisation request using the legal identity key. */
|
||||
|
@ -79,9 +79,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party) {
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) {
|
||||
try {
|
||||
uniquenessProvider.commit(inputs, txId, caller)
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature)
|
||||
} catch (e: NotaryInternalException) {
|
||||
if (e.error is NotaryError.Conflict) {
|
||||
val conflicts = inputs.filterIndexed { _, stateRef ->
|
||||
|
@ -3,6 +3,7 @@ package net.corda.core.node.services
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
@ -14,11 +15,12 @@ import net.corda.core.serialization.CordaSerializable
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party)
|
||||
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature)
|
||||
|
||||
/** Specifies the consuming transaction for every conflicting state. */
|
||||
@CordaSerializable
|
||||
@Deprecated("No longer used due to potential privacy leak")
|
||||
@Suppress("DEPRECATION")
|
||||
data class Conflict(val stateHistory: Map<StateRef, ConsumingTx>)
|
||||
|
||||
/**
|
||||
@ -26,6 +28,7 @@ interface UniquenessProvider {
|
||||
* the caller identity requesting the commit.
|
||||
*/
|
||||
@CordaSerializable
|
||||
@Deprecated("No longer used")
|
||||
data class ConsumingTx(val id: SecureHash, val inputIndex: Int, val requestingParty: Party)
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,31 @@
|
||||
package net.corda.core.serialization
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NotaryExceptionSerializationTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Test
|
||||
fun testSerializationRoundTrip() {
|
||||
val txhash = SecureHash.randomSHA256()
|
||||
val stateHistory: Map<StateRef, StateConsumptionDetails> = mapOf(
|
||||
StateRef(txhash, 0) to StateConsumptionDetails(txhash.sha256())
|
||||
)
|
||||
val error = NotaryError.Conflict(txhash, stateHistory)
|
||||
val instance = NotaryException(error)
|
||||
val instanceOnTheOtherSide = instance.serialize().bytes.deserialize<NotaryException>()
|
||||
|
||||
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
||||
}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package net.corda.core.serialization
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.UniquenessException
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class UniquenessExceptionSerializationTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Test
|
||||
fun testSerializationRoundTrip() {
|
||||
val txhash = SecureHash.randomSHA256()
|
||||
val txHash2 = SecureHash.randomSHA256()
|
||||
val dummyParty = Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public)
|
||||
val stateHistory: Map<StateRef, UniquenessProvider.ConsumingTx> = mapOf(StateRef(txhash, 0) to UniquenessProvider.ConsumingTx(txHash2, 1, dummyParty))
|
||||
val conflict = UniquenessProvider.Conflict(stateHistory)
|
||||
val instance = UniquenessException(conflict)
|
||||
|
||||
val instanceOnTheOtherSide = instance.serialize().deserialize()
|
||||
|
||||
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
||||
}
|
||||
}
|
@ -673,7 +673,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
|
||||
return notaryConfig.run {
|
||||
if (raft != null) {
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft)
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.clock, services.monitoringService.metrics, raft)
|
||||
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
|
||||
} else if (bftSMaRt != null) {
|
||||
if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported")
|
||||
|
@ -55,10 +55,11 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
|
||||
object NodeNotary
|
||||
|
||||
object NodeNotaryV1 : MappedSchema(schemaFamily = NodeNotary.javaClass, version = 1,
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.PersistentUniqueness::class.java,
|
||||
PersistentUniquenessProvider.PersistentNotaryCommit::class.java,
|
||||
RaftUniquenessProvider.RaftState::class.java,
|
||||
BFTNonValidatingNotaryService.PersistedCommittedState::class.java
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
||||
PersistentUniquenessProvider.Request::class.java,
|
||||
PersistentUniquenessProvider.CommittedState::class.java,
|
||||
RaftUniquenessProvider.CommittedState::class.java,
|
||||
BFTNonValidatingNotaryService.CommittedState::class.java
|
||||
))
|
||||
|
||||
// Required schemas are those used by internal Corda services
|
||||
|
@ -3,14 +3,11 @@ package net.corda.node.services.transactions
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.NotaryService
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -99,58 +96,52 @@ class BFTNonValidatingNotaryService(
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_states")
|
||||
class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty)
|
||||
: PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : PersistentUniquenessProvider.BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef> {
|
||||
private fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> {
|
||||
return AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||
val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
UniquenessProvider.ConsumingTx(
|
||||
id = SecureHash.parse(it.consumingTxHash),
|
||||
inputIndex = it.consumingIndex,
|
||||
requestingParty = Party(
|
||||
name = CordaX500Name.parse(it.party.name),
|
||||
owningKey = Crypto.decodePublicKey(it.party.owningKey))))
|
||||
},
|
||||
toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx ->
|
||||
PersistedCommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString(),
|
||||
consumingIndex = inputIndex,
|
||||
party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(),
|
||||
requestingParty.owningKey.encoded)
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
SecureHash.parse(it.consumingTxHash)
|
||||
)
|
||||
},
|
||||
persistentEntityClass = PersistedCommittedState::class.java
|
||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||
CommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString()
|
||||
)
|
||||
},
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
}
|
||||
|
||||
private class Replica(config: BFTSMaRtConfig,
|
||||
replicaId: Int,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>,
|
||||
services: ServiceHubInternal,
|
||||
notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
|
||||
|
||||
override fun executeCommand(command: ByteArray): ByteArray {
|
||||
val commitRequest = command.deserialize<BFTSMaRt.CommitRequest>()
|
||||
verifyRequest(commitRequest)
|
||||
val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity)
|
||||
val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature)
|
||||
return response.serialize().bytes
|
||||
}
|
||||
|
||||
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse {
|
||||
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSMaRt.ReplicaResponse {
|
||||
return try {
|
||||
val id = transaction.id
|
||||
val inputs = transaction.inputs
|
||||
val notary = transaction.notary
|
||||
if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow)
|
||||
if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary)
|
||||
commitInputStates(inputs, id, callerIdentity)
|
||||
commitInputStates(inputs, id, callerIdentity.name, requestSignature)
|
||||
log.debug { "Inputs committed successfully, signing $id" }
|
||||
BFTSMaRt.ReplicaResponse.Signature(sign(id))
|
||||
} catch (e: NotaryInternalException) {
|
||||
@ -166,7 +157,6 @@ class BFTNonValidatingNotaryService(
|
||||
val transaction = commitRequest.payload.coreTransaction
|
||||
val notarisationRequest = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
notarisationRequest.verifySignature(commitRequest.payload.requestSignature, commitRequest.callerIdentity)
|
||||
// TODO: persist the signature for traceability.
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,10 +15,10 @@ import bftsmart.tom.util.Extractor
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.internal.toTypedArray
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -30,6 +30,7 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Client
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Replica
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
@ -169,8 +170,8 @@ object BFTSMaRt {
|
||||
*/
|
||||
abstract class Replica(config: BFTSMaRtConfig,
|
||||
replicaId: Int,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
|
||||
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash,
|
||||
BFTNonValidatingNotaryService.CommittedState, PersistentStateRef>,
|
||||
protected val services: ServiceHubInternal,
|
||||
protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() {
|
||||
companion object {
|
||||
@ -215,28 +216,40 @@ object BFTSMaRt {
|
||||
*/
|
||||
abstract fun executeCommand(command: ByteArray): ByteArray?
|
||||
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
|
||||
log.debug { "Attempting to commit inputs for transaction: $txId" }
|
||||
val conflicts = mutableMapOf<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
|
||||
val conflicts = mutableMapOf<StateRef, SecureHash>()
|
||||
services.database.transaction {
|
||||
logRequest(txId, callerName, requestSignature)
|
||||
states.forEach { state ->
|
||||
commitLog[state]?.let { conflicts[state] = it }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
|
||||
commitLog[stateRef] = txInfo
|
||||
states.forEach { stateRef ->
|
||||
commitLog[stateRef] = txId
|
||||
}
|
||||
} else {
|
||||
log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
|
||||
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflict)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logRequest(txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
|
||||
val request = PersistentUniquenessProvider.Request(
|
||||
consumingTxHash = txId.toString(),
|
||||
partyName = callerName.toString(),
|
||||
requestSignature = requestSignature.serialize().bytes,
|
||||
requestDate = services.clock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
/** Generates a signature over an arbitrary array of bytes. */
|
||||
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
|
||||
return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) }
|
||||
@ -253,18 +266,25 @@ object BFTSMaRt {
|
||||
// - Add streaming to support large data sets.
|
||||
override fun getSnapshot(): ByteArray {
|
||||
// LinkedHashMap for deterministic serialisation
|
||||
val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
services.database.transaction {
|
||||
commitLog.allPersisted().forEach { m[it.first] = it.second }
|
||||
val committedStates = LinkedHashMap<StateRef, SecureHash>()
|
||||
val requests = services.database.transaction {
|
||||
commitLog.allPersisted().forEach { committedStates[it.first] = it.second }
|
||||
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)
|
||||
criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java))
|
||||
session.createQuery(criteriaQuery).resultList
|
||||
}
|
||||
return m.serialize().bytes
|
||||
return (committedStates to requests).serialize().bytes
|
||||
}
|
||||
|
||||
override fun installSnapshot(bytes: ByteArray) {
|
||||
val m = bytes.deserialize<LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>>()
|
||||
val (committedStates, requests) = bytes.deserialize<Pair<LinkedHashMap<StateRef, SecureHash>, List<PersistentUniquenessProvider.Request>>>()
|
||||
services.database.transaction {
|
||||
commitLog.clear()
|
||||
commitLog.putAll(m)
|
||||
commitLog.putAll(committedStates)
|
||||
val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java)
|
||||
deleteQuery.from(PersistentUniquenessProvider.Request::class.java)
|
||||
session.createQuery(deleteQuery).executeUpdate()
|
||||
requests.forEach { session.persist(it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,106 +0,0 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated
|
||||
* across a Copycat Raft cluster.
|
||||
*
|
||||
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the
|
||||
* new (or re-joining) cluster member.
|
||||
*/
|
||||
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine(), Snapshottable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
object Commands {
|
||||
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||
// removed from the log during minor or major compaction.
|
||||
//
|
||||
// Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on
|
||||
// purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes.
|
||||
// TODO: Cluster membership changes need testing.
|
||||
// TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to
|
||||
// setup a new cluster of the desired size and transfer the data.
|
||||
return Command.CompactionMode.FULL
|
||||
}
|
||||
}
|
||||
|
||||
class Size : Query<Int>
|
||||
class Get<out K, V>(val key: K) : Query<V?>
|
||||
}
|
||||
|
||||
private val map = db.transaction { createMap() }
|
||||
|
||||
/** Gets a value for the given [Commands.Get.key] */
|
||||
fun get(commit: Commit<Commands.Get<K, V>>): V? {
|
||||
commit.use {
|
||||
val key = it.operation().key
|
||||
return db.transaction { map[key]?.second }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [Commands.PutAll.entries] if no entry key already exists.
|
||||
*
|
||||
* @return map containing conflicting entries
|
||||
*/
|
||||
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
|
||||
commit.use {
|
||||
val index = commit.index()
|
||||
val conflicts = LinkedHashMap<K, V>()
|
||||
db.transaction {
|
||||
val entries = commit.operation().entries
|
||||
log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})")
|
||||
for (key in entries.keys) map[key]?.let { conflicts[key] = it.second }
|
||||
if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) })
|
||||
}
|
||||
return conflicts
|
||||
}
|
||||
}
|
||||
|
||||
fun size(commit: Commit<Commands.Size>): Int {
|
||||
commit.use { _ ->
|
||||
return db.transaction { map.size }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
||||
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
||||
* fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
db.transaction {
|
||||
writer.writeInt(map.size)
|
||||
map.allPersisted().forEach { writer.writeObject(it.first to it.second) }
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and adds them to [map]. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
db.transaction {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val (key, value) = reader.readObject<Pair<K, Pair<Long, V>>>()
|
||||
map[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -2,18 +2,13 @@ package net.corda.node.services.transactions
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.ComponentGroupEnum
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.TransactionParts
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) {
|
||||
/**
|
||||
@ -25,13 +20,11 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut
|
||||
* undo the commit of the input states (the exact mechanism still needs to be worked out).
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap { payload ->
|
||||
val transaction = payload.coreTransaction
|
||||
val request = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
validateRequest(request, payload.requestSignature)
|
||||
extractParts(transaction)
|
||||
}
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
val transaction = requestPayload.coreTransaction
|
||||
val request = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
validateRequestSignature(request, requestPayload.requestSignature)
|
||||
return extractParts(transaction)
|
||||
}
|
||||
|
||||
private fun extractParts(tx: CoreTransaction): TransactionParts {
|
||||
|
@ -1,62 +1,67 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.hibernate.annotations.Type
|
||||
import java.io.Serializable
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
@MappedSuperclass
|
||||
open class PersistentUniqueness(
|
||||
open class BaseComittedState(
|
||||
@EmbeddedId
|
||||
var id: PersistentStateRef = PersistentStateRef(),
|
||||
val id: PersistentStateRef,
|
||||
|
||||
@Column(name = "consuming_transaction_id")
|
||||
var consumingTxHash: String = "",
|
||||
|
||||
@Column(name = "consuming_input_index", length = 36)
|
||||
var consumingIndex: Int = 0,
|
||||
|
||||
@Embedded
|
||||
var party: PersistentParty = PersistentParty()
|
||||
val consumingTxHash: String
|
||||
)
|
||||
|
||||
@Embeddable
|
||||
data class PersistentParty(
|
||||
@Column(name = "requesting_party_name")
|
||||
var name: String = "",
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log")
|
||||
@CordaSerializable
|
||||
class Request(
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.AUTO)
|
||||
val id: Int = 0,
|
||||
|
||||
@Column(name = "requesting_party_key", length = 255)
|
||||
@Type(type = "corda-wrapper-binary")
|
||||
var owningKey: ByteArray = EMPTY_BYTE_ARRAY
|
||||
) : Serializable
|
||||
@Column(name = "consuming_transaction_id")
|
||||
val consumingTxHash: String,
|
||||
|
||||
@Embedded
|
||||
@Column(name = "requesting_party_name")
|
||||
var partyName: String,
|
||||
|
||||
@Lob
|
||||
@Column(name = "request_signature")
|
||||
val requestSignature: ByteArray,
|
||||
|
||||
@Column(name = "request_timestamp")
|
||||
var requestDate: Instant
|
||||
)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log")
|
||||
class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty) :
|
||||
PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
|
||||
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private class InnerState {
|
||||
val committedStates = createMap()
|
||||
@ -66,7 +71,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentNotaryCommit, PersistentStateRef> =
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
@ -74,46 +79,58 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
UniquenessProvider.ConsumingTx(
|
||||
id = SecureHash.parse(it.consumingTxHash),
|
||||
inputIndex = it.consumingIndex,
|
||||
requestingParty = Party(
|
||||
name = CordaX500Name.parse(it.party.name),
|
||||
owningKey = Crypto.decodePublicKey(it.party.owningKey))))
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
SecureHash.parse(it.consumingTxHash)
|
||||
)
|
||||
|
||||
},
|
||||
toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx ->
|
||||
PersistentNotaryCommit(
|
||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||
CommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString(),
|
||||
consumingIndex = inputIndex,
|
||||
party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.encoded)
|
||||
consumingTxHash = id.toString()
|
||||
)
|
||||
},
|
||||
persistentEntityClass = PersistentNotaryCommit::class.java
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
logRequest(txId, callerIdentity, requestSignature)
|
||||
val conflict = commitStates(states, txId)
|
||||
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
|
||||
}
|
||||
|
||||
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
val request = Request(
|
||||
consumingTxHash = txId.toString(),
|
||||
partyName = callerIdentity.name.toString(),
|
||||
requestSignature = requestSignature.serialize().bytes,
|
||||
requestDate = clock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
private fun commitStates(states: List<StateRef>, txId: SecureHash): Map<StateRef, StateConsumptionDetails>? {
|
||||
val conflict = mutex.locked {
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
val conflictingStates = LinkedHashMap<StateRef, SecureHash>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = committedStates.get(inputState)
|
||||
val consumingTx = committedStates[inputState]
|
||||
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||
}
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
|
||||
val conflict = conflictingStates.mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) }
|
||||
conflict
|
||||
} else {
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
committedStates[stateRef] = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
|
||||
states.forEach { stateRef ->
|
||||
committedStates[stateRef] = txId
|
||||
}
|
||||
log.debug("Successfully committed all input states: $states")
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
|
||||
return conflict
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,261 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.catalyst.buffer.BufferInput
|
||||
import io.atomix.catalyst.buffer.BufferOutput
|
||||
import io.atomix.catalyst.serializer.Serializer
|
||||
import io.atomix.catalyst.serializer.TypeSerializer
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.time.Clock
|
||||
|
||||
/**
|
||||
* Notarised contract state commit log, replicated across a Copycat Raft cluster.
|
||||
*
|
||||
* Copycat ony supports in-memory state machines, so we back the state with JDBC tables.
|
||||
* State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member.
|
||||
*/
|
||||
class RaftTransactionCommitLog<E, EK>(
|
||||
val db: CordaPersistence,
|
||||
val nodeClock: Clock,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, E, EK>
|
||||
) : StateMachine(), Snapshottable {
|
||||
object Commands {
|
||||
class CommitTransaction(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val requestingParty: String,
|
||||
val requestSignature: ByteArray
|
||||
) : Command<Map<StateRef, SecureHash>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||
// removed from the log during minor or major compaction.
|
||||
//
|
||||
// Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on
|
||||
// purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes.
|
||||
// TODO: Cluster membership changes need testing.
|
||||
// TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to
|
||||
// setup a new cluster of the desired size and transfer the data.
|
||||
return Command.CompactionMode.FULL
|
||||
}
|
||||
}
|
||||
|
||||
class Get(val key: StateRef) : Query<SecureHash?>
|
||||
}
|
||||
|
||||
private val map = db.transaction { createMap() }
|
||||
|
||||
/** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */
|
||||
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): Map<StateRef, SecureHash> {
|
||||
raftCommit.use {
|
||||
val index = it.index()
|
||||
val conflicts = LinkedHashMap<StateRef, SecureHash>()
|
||||
db.transaction {
|
||||
val commitCommand = raftCommit.command()
|
||||
logRequest(commitCommand)
|
||||
val states = commitCommand.states
|
||||
val txId = commitCommand.txId
|
||||
log.debug("State machine commit: storing entries with keys (${states.joinToString()})")
|
||||
for (state in states) {
|
||||
map[state]?.let { conflicts[state] = it.second }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
val entries = states.map { it to Pair(index, txId) }.toMap()
|
||||
map.putAll(entries)
|
||||
}
|
||||
}
|
||||
return conflicts
|
||||
}
|
||||
}
|
||||
|
||||
private fun logRequest(commitCommand: RaftTransactionCommitLog.Commands.CommitTransaction) {
|
||||
val request = PersistentUniquenessProvider.Request(
|
||||
consumingTxHash = commitCommand.txId.toString(),
|
||||
partyName = commitCommand.requestingParty,
|
||||
requestSignature = commitCommand.requestSignature,
|
||||
requestDate = nodeClock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
/** Gets the consuming transaction id for a given state reference. */
|
||||
fun get(commit: Commit<Commands.Get>): SecureHash? {
|
||||
commit.use {
|
||||
val key = it.operation().key
|
||||
return db.transaction { map[key]?.second }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all committed state and notarisation request entries to disk. Note that this operation does not
|
||||
* load all entries into memory, as the [SnapshotWriter] is using a disk-backed buffer internally, and iterating
|
||||
* map entries results in only a fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
db.transaction {
|
||||
writer.writeInt(map.size)
|
||||
map.allPersisted().forEach {
|
||||
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
writer.writeUnsignedShort(bytes.size)
|
||||
writer.writeObject(bytes)
|
||||
}
|
||||
|
||||
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)
|
||||
criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java))
|
||||
val results = session.createQuery(criteriaQuery).resultList
|
||||
|
||||
writer.writeInt(results.size)
|
||||
results.forEach {
|
||||
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
writer.writeUnsignedShort(bytes.size)
|
||||
writer.writeObject(bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and populates the committed state and notarisation request tables. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
db.transaction {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val bytes = ByteArray(reader.readUnsignedShort())
|
||||
reader.read(bytes)
|
||||
val (key, value) = bytes.deserialize<Pair<StateRef, Pair<Long, SecureHash>>>()
|
||||
map[key] = value
|
||||
}
|
||||
// Clean notarisation request log
|
||||
val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java)
|
||||
deleteQuery.from(PersistentUniquenessProvider.Request::class.java)
|
||||
session.createQuery(deleteQuery).executeUpdate()
|
||||
// Load and populate request log
|
||||
for (i in 1..reader.readInt()) {
|
||||
val bytes = ByteArray(reader.readUnsignedShort())
|
||||
reader.read(bytes)
|
||||
val request = bytes.deserialize<PersistentUniquenessProvider.Request>()
|
||||
session.persist(request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
// Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
|
||||
val serializer: Serializer by lazy {
|
||||
Serializer().apply {
|
||||
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) {
|
||||
object : TypeSerializer<Commands.CommitTransaction> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction,
|
||||
buffer: BufferOutput<out BufferOutput<*>>,
|
||||
serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.states.size)
|
||||
with(serializer) {
|
||||
obj.states.forEach {
|
||||
writeObject(it, buffer)
|
||||
}
|
||||
writeObject(obj.txId, buffer)
|
||||
}
|
||||
buffer.writeString(obj.requestingParty)
|
||||
buffer.writeInt(obj.requestSignature.size)
|
||||
buffer.write(obj.requestSignature)
|
||||
}
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.CommitTransaction>,
|
||||
buffer: BufferInput<out BufferInput<*>>,
|
||||
serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction {
|
||||
val stateCount = buffer.readUnsignedShort()
|
||||
val states = (1..stateCount).map {
|
||||
serializer.readObject<StateRef>(buffer)
|
||||
}
|
||||
val txId = serializer.readObject<SecureHash>(buffer)
|
||||
val name = buffer.readString()
|
||||
val signatureSize = buffer.readInt()
|
||||
val signature = ByteArray(signatureSize)
|
||||
buffer.read(signature)
|
||||
return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(RaftTransactionCommitLog.Commands.Get::class.java) {
|
||||
object : TypeSerializer<Commands.Get> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
serializer.writeObject(obj.key, buffer)
|
||||
}
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.Get>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get {
|
||||
val key = serializer.readObject<StateRef>(buffer)
|
||||
return RaftTransactionCommitLog.Commands.Get(key)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
register(StateRef::class.java) {
|
||||
object : TypeSerializer<StateRef> {
|
||||
override fun write(obj: StateRef, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeString(obj.encoded())
|
||||
}
|
||||
|
||||
override fun read(type: Class<StateRef>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): StateRef {
|
||||
return buffer.readString().parseStateRef()
|
||||
}
|
||||
}
|
||||
}
|
||||
registerAbstract(SecureHash::class.java) {
|
||||
object : TypeSerializer<SecureHash> {
|
||||
override fun write(obj: SecureHash, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.bytes.size)
|
||||
buffer.write(obj.bytes)
|
||||
}
|
||||
|
||||
override fun read(type: Class<SecureHash>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): SecureHash {
|
||||
val size = buffer.readUnsignedShort()
|
||||
val bytes = ByteArray(size)
|
||||
buffer.read(bytes)
|
||||
return SecureHash.SHA256(bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(LinkedHashMap::class.java) {
|
||||
object : TypeSerializer<LinkedHashMap<*, *>> {
|
||||
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeInt(obj.size)
|
||||
obj.forEach {
|
||||
with(serializer) {
|
||||
writeObject(it.key, buffer)
|
||||
writeObject(it.value, buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<*, *> {
|
||||
return LinkedHashMap<Any, Any>().apply {
|
||||
repeat(buffer.readInt()) {
|
||||
put(serializer.readObject(buffer), serializer.readObject(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -2,10 +2,6 @@ package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import io.atomix.catalyst.buffer.BufferInput
|
||||
import io.atomix.catalyst.buffer.BufferOutput
|
||||
import io.atomix.catalyst.serializer.Serializer
|
||||
import io.atomix.catalyst.serializer.TypeSerializer
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.catalyst.transport.Transport
|
||||
import io.atomix.catalyst.transport.netty.NettyTransport
|
||||
@ -20,14 +16,14 @@ import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.RaftConfig
|
||||
@ -36,11 +32,14 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import java.nio.file.Path
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.EmbeddedId
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Table
|
||||
|
||||
/**
|
||||
* A uniqueness provider that records committed input states in a distributed collection replicated and
|
||||
@ -51,37 +50,49 @@ import javax.persistence.*
|
||||
* to the cluster leader to be actioned.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
class RaftUniquenessProvider(
|
||||
private val transportConfiguration: NodeSSLConfiguration,
|
||||
private val db: CordaPersistence,
|
||||
private val clock: Clock,
|
||||
private val metrics: MetricRegistry,
|
||||
private val raftConfig: RaftConfig
|
||||
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<String, Pair<Long, Any>, RaftState, String> =
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { it },
|
||||
toPersistentEntityKey = { PersistentStateRef(it) },
|
||||
fromPersistentEntity = {
|
||||
Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)))
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
Pair(it.index, SecureHash.parse(it.value) as SecureHash))
|
||||
|
||||
},
|
||||
toPersistentEntity = { k: String, v: Pair<Long, Any> ->
|
||||
RaftState().apply {
|
||||
key = k
|
||||
value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
index = v.first
|
||||
}
|
||||
toPersistentEntity = { k: StateRef, v: Pair<Long, SecureHash> ->
|
||||
CommittedState(
|
||||
PersistentStateRef(k),
|
||||
v.second.toString(),
|
||||
v.first)
|
||||
|
||||
},
|
||||
persistentEntityClass = RaftState::class.java
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
fun String.parseStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_states")
|
||||
class RaftState(
|
||||
@Id
|
||||
@Column(name = "id")
|
||||
var key: String = "",
|
||||
|
||||
@Lob
|
||||
@Column(name = "state_value")
|
||||
var value: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
@Column(name = "state_index")
|
||||
class CommittedState(
|
||||
@EmbeddedId
|
||||
val id: PersistentStateRef,
|
||||
@Column(name = "consuming_transaction_id")
|
||||
var value: String = "",
|
||||
@Column(name = "raft_log_index")
|
||||
var index: Long = 0
|
||||
)
|
||||
|
||||
@ -100,41 +111,17 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
fun start() {
|
||||
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
||||
val stateMachineFactory = {
|
||||
DistributedImmutableMap(db, RaftUniquenessProvider.Companion::createMap)
|
||||
RaftTransactionCommitLog(db, clock, RaftUniquenessProvider.Companion::createMap)
|
||||
}
|
||||
val address = raftConfig.nodeAddress.let { Address(it.host, it.port) }
|
||||
val storage = buildStorage(storagePath)
|
||||
val transport = buildTransport(transportConfiguration)
|
||||
val serializer = Serializer().apply {
|
||||
// Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
|
||||
register(DistributedImmutableMap.Commands.PutAll::class.java) {
|
||||
object : TypeSerializer<DistributedImmutableMap.Commands.PutAll<*, *>> {
|
||||
override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>,
|
||||
buffer: BufferOutput<out BufferOutput<*>>,
|
||||
serializer: Serializer) {
|
||||
writeMap(obj.entries, buffer, serializer)
|
||||
}
|
||||
|
||||
override fun read(type: Class<DistributedImmutableMap.Commands.PutAll<*, *>>,
|
||||
buffer: BufferInput<out BufferInput<*>>,
|
||||
serializer: Serializer): DistributedImmutableMap.Commands.PutAll<Any, Any> {
|
||||
return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer))
|
||||
}
|
||||
}
|
||||
}
|
||||
register(LinkedHashMap::class.java) {
|
||||
object : TypeSerializer<LinkedHashMap<*, *>> {
|
||||
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) = writeMap(obj, buffer, serializer)
|
||||
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer) = readMap(buffer, serializer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withServerTransport(transport)
|
||||
.withSerializer(serializer)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
|
||||
val serverFuture = if (raftConfig.clusterAddresses.isNotEmpty()) {
|
||||
@ -151,7 +138,7 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
val client = CopycatClient.builder(address)
|
||||
.withTransport(transport) // TODO: use local transport for client-server communications
|
||||
.withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF)
|
||||
.withSerializer(serializer)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.withRecoveryStrategy(RecoveryStrategies.RECOVER)
|
||||
.build()
|
||||
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
||||
@ -200,50 +187,22 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
}
|
||||
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) }
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries))
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
states,
|
||||
txId,
|
||||
callerIdentity.name.toString(),
|
||||
requestSignature.serialize().bytes
|
||||
)
|
||||
val conflicts = client.submit(commitCommand).get()
|
||||
|
||||
if (conflicts.isNotEmpty()) {
|
||||
val conflictingStates = decode(conflicts).mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflictingStates)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
log.debug("All input states of transaction $txId have been committed")
|
||||
}
|
||||
|
||||
/**
|
||||
* Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray
|
||||
* here to avoid having to define additional serializers for our custom types.
|
||||
*/
|
||||
private fun encode(items: List<Pair<StateRef, UniquenessProvider.ConsumingTx>>): Map<String, ByteArray> {
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
return items.map { it.first.encoded() to it.second.serialize().bytes }.toMap()
|
||||
}
|
||||
|
||||
private fun decode(items: Map<String, ByteArray>): Map<StateRef, UniquenessProvider.ConsumingTx> {
|
||||
fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
return items.map { it.key.toStateRef() to it.value.deserialize<UniquenessProvider.ConsumingTx>() }.toMap()
|
||||
}
|
||||
}
|
||||
|
||||
private fun writeMap(map: Map<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) = with(map) {
|
||||
buffer.writeInt(size)
|
||||
forEach {
|
||||
with(serializer) {
|
||||
writeObject(it.key, buffer)
|
||||
writeObject(it.value, buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun readMap(buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<Any, Any> {
|
||||
return LinkedHashMap<Any, Any>().apply {
|
||||
repeat(buffer.readInt()) {
|
||||
put(serializer.readObject(buffer), serializer.readObject(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A simple Notary service that does not perform transaction validation */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -5,12 +5,11 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.security.SignatureException
|
||||
|
||||
/**
|
||||
@ -25,9 +24,10 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor
|
||||
* the transaction in question has all required signatures apart from the notary's.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
try {
|
||||
val stx = receiveTransaction()
|
||||
val stx = requestPayload.signedTransaction
|
||||
validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature)
|
||||
val notary = stx.notary
|
||||
checkNotary(notary)
|
||||
resolveAndContractVerify(stx)
|
||||
@ -43,15 +43,6 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun receiveTransaction(): SignedTransaction {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap {
|
||||
val stx = it.signedTransaction
|
||||
validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature)
|
||||
stx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveAndContractVerify(stx: SignedTransaction) {
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
|
||||
|
@ -8,7 +8,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
||||
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -1,9 +1,12 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
@ -19,6 +22,7 @@ import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
@ -28,6 +32,7 @@ class PersistentUniquenessProviderTests {
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
|
||||
private val txID = SecureHash.randomSHA256()
|
||||
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
|
||||
|
||||
private lateinit var database: CordaPersistence
|
||||
|
||||
@ -46,23 +51,25 @@ class PersistentUniquenessProviderTests {
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
database.transaction {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity)
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
database.transaction {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
provider.commit(inputs, txID, identity)
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
|
||||
val ex = assertFailsWith<NotaryInternalException> { provider.commit(inputs, txID, identity) }
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
|
@ -6,6 +6,8 @@ import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.concurrent.asCordaFuture
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -14,20 +16,22 @@ import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.freeLocalHostAndPort
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class DistributedImmutableMapTests {
|
||||
class RaftTransactionCommitLogTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
@Rule
|
||||
@ -40,12 +44,13 @@ class DistributedImmutableMapTests {
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel("+io.atomix")
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset("org.apache.activemq", "io.atomix")
|
||||
cluster.map { it.client.close().asCordaFuture() }.transpose().getOrThrow()
|
||||
cluster.map { it.server.shutdown().asCordaFuture() }.transpose().getOrThrow()
|
||||
databases.forEach { it.close() }
|
||||
@ -55,28 +60,38 @@ class DistributedImmutableMapTests {
|
||||
fun `stores entries correctly`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
val conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
|
||||
val value1 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key1"))
|
||||
val value2 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key2"))
|
||||
val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0]))
|
||||
val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1]))
|
||||
|
||||
assertEquals(value1.getOrThrow(), "value1")
|
||||
assertEquals(value2.getOrThrow(), "value2")
|
||||
assertEquals(value1.getOrThrow(), txId)
|
||||
assertEquals(value2.getOrThrow(), txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `returns conflict for duplicate entries`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
var conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
assertTrue { conflict == entries }
|
||||
|
||||
conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertEquals(conflict.keys, states.toSet())
|
||||
conflict.forEach { assertEquals(it.value, txId) }
|
||||
}
|
||||
|
||||
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
|
||||
@ -91,11 +106,12 @@ class DistributedImmutableMapTests {
|
||||
val address = Address(myAddress.host, myAddress.port)
|
||||
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(serverNameTablePrefix = "PORT_${myAddress.port}_"), rigorousMock(), NodeSchemaService(includeNotarySchemas = true))
|
||||
databases.add(database)
|
||||
val stateMachineFactory = { DistributedImmutableMap(database, RaftUniquenessProvider.Companion::createMap) }
|
||||
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) }
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
|
||||
val serverInitFuture = if (clusterAddress != null) {
|
||||
@ -107,6 +123,7 @@ class DistributedImmutableMapTests {
|
||||
|
||||
val client = CopycatClient.builder(address)
|
||||
.withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) }
|
||||
}
|
@ -5,14 +5,13 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import java.security.PublicKey
|
||||
import java.security.SignatureException
|
||||
@ -26,7 +25,7 @@ import java.security.SignatureException
|
||||
// START 1
|
||||
@CordaService
|
||||
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
@ -43,9 +42,10 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating
|
||||
* transaction dependency chain.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
try {
|
||||
val stx = receiveTransaction()
|
||||
val stx = requestPayload.signedTransaction
|
||||
validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature)
|
||||
val notary = stx.notary
|
||||
checkNotary(notary)
|
||||
verifySignatures(stx)
|
||||
@ -61,15 +61,6 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun receiveTransaction(): SignedTransaction {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap {
|
||||
val stx = it.signedTransaction
|
||||
validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature)
|
||||
stx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveAndContractVerify(stx: SignedTransaction) {
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
|
||||
|
Loading…
Reference in New Issue
Block a user