mirror of
https://github.com/corda/corda.git
synced 2024-12-25 23:51:10 +00:00
Merge remote-tracking branch 'open/master' into andrius-merge-1503
This commit is contained in:
commit
0d1e9694ae
@ -1469,7 +1469,7 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net
|
||||
@co.paralleluniverse.fibers.Suspendable protected final void checkNotary(net.corda.core.identity.Party)
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.FlowSession getOtherSideSession()
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService()
|
||||
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.TransactionParts receiveAndVerifyTx()
|
||||
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload)
|
||||
##
|
||||
public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic
|
||||
public <init>(net.corda.core.flows.FlowSession)
|
||||
@ -2078,7 +2078,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService
|
||||
public <init>()
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
|
||||
@org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
|
||||
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
|
||||
@ -2094,7 +2094,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer
|
||||
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError()
|
||||
##
|
||||
public interface net.corda.core.node.services.UniquenessProvider
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
||||
public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature)
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object
|
||||
public <init>(Map)
|
||||
|
@ -137,7 +137,7 @@ class NotaryFlow {
|
||||
* It checks that the time-window command is valid (if present) and commits the input state, or returns a conflict
|
||||
* if any of the input states have been previously committed.
|
||||
*
|
||||
* Additional transaction validation logic can be added when implementing [receiveAndVerifyTx].
|
||||
* Additional transaction validation logic can be added when implementing [validateRequest].
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
|
||||
@ -147,13 +147,15 @@ class NotaryFlow {
|
||||
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
|
||||
"We are not a notary on the network"
|
||||
}
|
||||
|
||||
val requestPayload = otherSideSession.receive<NotarisationPayload>().unwrap { it }
|
||||
var txId: SecureHash? = null
|
||||
try {
|
||||
val parts = receiveAndVerifyTx()
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
service.validateTimeWindow(parts.timestamp)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature)
|
||||
signTransactionAndSendResponse(txId)
|
||||
} catch (e: NotaryInternalException) {
|
||||
throw NotaryException(e.error, txId)
|
||||
@ -162,11 +164,10 @@ class NotaryFlow {
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement custom logic to receive the transaction to notarise, and perform verification based on validity and
|
||||
* privacy requirements.
|
||||
* Implement custom logic to perform transaction verification based on validity and privacy requirements.
|
||||
*/
|
||||
@Suspendable
|
||||
abstract fun receiveAndVerifyTx(): TransactionParts
|
||||
protected abstract fun validateRequest(requestPayload: NotarisationPayload): TransactionParts
|
||||
|
||||
/** Check if transaction is intended to be signed by this notary. */
|
||||
@Suspendable
|
||||
|
@ -407,10 +407,9 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade
|
||||
}
|
||||
|
||||
/** Verifies that the correct notarisation request was signed by the counterparty. */
|
||||
fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) {
|
||||
fun NotaryFlow.Service.validateRequestSignature(request: NotarisationRequest, signature: NotarisationRequestSignature) {
|
||||
val requestingParty = otherSideSession.counterparty
|
||||
request.verifySignature(signature, requestingParty)
|
||||
// TODO: persist the signature for traceability. Do we need to persist the request as well?
|
||||
}
|
||||
|
||||
/** Creates a signature over the notarisation request using the legal identity key. */
|
||||
|
@ -89,9 +89,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||
* this method does not throw an exception when input states are present multiple times within the transaction.
|
||||
*/
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party) {
|
||||
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) {
|
||||
try {
|
||||
uniquenessProvider.commit(inputs, txId, caller)
|
||||
uniquenessProvider.commit(inputs, txId, caller, requestSignature)
|
||||
} catch (e: NotaryInternalException) {
|
||||
if (e.error is NotaryError.Conflict) {
|
||||
val conflicts = inputs.filterIndexed { _, stateRef ->
|
||||
|
@ -13,6 +13,7 @@ package net.corda.core.node.services
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
@ -24,11 +25,12 @@ import net.corda.core.serialization.CordaSerializable
|
||||
*/
|
||||
interface UniquenessProvider {
|
||||
/** Commits all input states of the given transaction. */
|
||||
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party)
|
||||
fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature)
|
||||
|
||||
/** Specifies the consuming transaction for every conflicting state. */
|
||||
@CordaSerializable
|
||||
@Deprecated("No longer used due to potential privacy leak")
|
||||
@Suppress("DEPRECATION")
|
||||
data class Conflict(val stateHistory: Map<StateRef, ConsumingTx>)
|
||||
|
||||
/**
|
||||
@ -36,6 +38,7 @@ interface UniquenessProvider {
|
||||
* the caller identity requesting the commit.
|
||||
*/
|
||||
@CordaSerializable
|
||||
@Deprecated("No longer used")
|
||||
data class ConsumingTx(val id: SecureHash, val inputIndex: Int, val requestingParty: Party)
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,31 @@
|
||||
package net.corda.core.serialization
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NotaryExceptionSerializationTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Test
|
||||
fun testSerializationRoundTrip() {
|
||||
val txhash = SecureHash.randomSHA256()
|
||||
val stateHistory: Map<StateRef, StateConsumptionDetails> = mapOf(
|
||||
StateRef(txhash, 0) to StateConsumptionDetails(txhash.sha256())
|
||||
)
|
||||
val error = NotaryError.Conflict(txhash, stateHistory)
|
||||
val instance = NotaryException(error)
|
||||
val instanceOnTheOtherSide = instance.serialize().bytes.deserialize<NotaryException>()
|
||||
|
||||
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
||||
}
|
||||
}
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.core.serialization
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.UniquenessException
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class UniquenessExceptionSerializationTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Test
|
||||
fun testSerializationRoundTrip() {
|
||||
val txhash = SecureHash.randomSHA256()
|
||||
val txHash2 = SecureHash.randomSHA256()
|
||||
val dummyParty = Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public)
|
||||
val stateHistory: Map<StateRef, UniquenessProvider.ConsumingTx> = mapOf(StateRef(txhash, 0) to UniquenessProvider.ConsumingTx(txHash2, 1, dummyParty))
|
||||
val conflict = UniquenessProvider.Conflict(stateHistory)
|
||||
val instance = UniquenessException(conflict)
|
||||
|
||||
val instanceOnTheOtherSide = instance.serialize().deserialize()
|
||||
|
||||
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
||||
}
|
||||
}
|
@ -13,6 +13,10 @@ UNRELEASED
|
||||
|
||||
* Introduced a placeholder for custom properties within ``node.conf``; the property key is "custom".
|
||||
|
||||
* java.math.BigInteger serialization support added.
|
||||
|
||||
* java.security.cert.CRLReason added to the default Whitelist.
|
||||
|
||||
* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster
|
||||
where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than
|
||||
one node with the legal name is found.
|
||||
|
@ -63,6 +63,7 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PrivateKeySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ThrowableSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BigDecimalSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.CurrencySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OpaqueBytesSubSequenceSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InstantSerializer(this))
|
||||
|
@ -0,0 +1,11 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp.custom
|
||||
|
||||
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||
import java.math.BigInteger
|
||||
|
||||
/**
|
||||
* A serializer for [BigInteger], utilising the string based helper. [BigInteger] seems to have no import/export
|
||||
* features that are precision independent other than via a string. The format of the string is discussed in the
|
||||
* documentation for [BigInteger.toString].
|
||||
*/
|
||||
object BigIntegerSerializer : CustomSerializer.ToString<BigInteger>(BigInteger::class.java)
|
@ -50,6 +50,7 @@ import java.io.ByteArrayInputStream
|
||||
import java.io.IOException
|
||||
import java.io.NotSerializableException
|
||||
import java.math.BigDecimal
|
||||
import java.math.BigInteger
|
||||
import java.time.*
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
@ -986,6 +987,21 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
assertEquals(objCopy.a, objCopy.b)
|
||||
}
|
||||
|
||||
data class BigIntegers(val a: BigInteger, val b: BigInteger)
|
||||
|
||||
@Test
|
||||
fun `test BigInteger custom serializer`() {
|
||||
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory.register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer)
|
||||
|
||||
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory2.register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer)
|
||||
|
||||
val obj = BigIntegers(BigInteger.TEN, BigInteger.TEN)
|
||||
val objCopy = serdes(obj, factory, factory2)
|
||||
assertEquals(objCopy.a, objCopy.b)
|
||||
}
|
||||
|
||||
data class ByteArrays(val a: ByteArray, val b: ByteArray)
|
||||
|
||||
@Test
|
||||
|
@ -692,7 +692,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
return notaryConfig.run {
|
||||
when {
|
||||
raft != null -> {
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft)
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.clock, services.monitoringService.metrics, raft)
|
||||
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
|
||||
}
|
||||
bftSMaRt != null -> {
|
||||
|
@ -66,11 +66,12 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
|
||||
object NodeNotary
|
||||
|
||||
object NodeNotaryV1 : MappedSchema(schemaFamily = NodeNotary.javaClass, version = 1,
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.PersistentUniqueness::class.java,
|
||||
PersistentUniquenessProvider.PersistentNotaryCommit::class.java,
|
||||
RaftUniquenessProvider.RaftState::class.java,
|
||||
BFTNonValidatingNotaryService.PersistedCommittedState::class.java
|
||||
)){
|
||||
mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java,
|
||||
PersistentUniquenessProvider.Request::class.java,
|
||||
PersistentUniquenessProvider.CommittedState::class.java,
|
||||
RaftUniquenessProvider.CommittedState::class.java,
|
||||
BFTNonValidatingNotaryService.CommittedState::class.java
|
||||
)) {
|
||||
override val migrationResource = "node-notary.changelog-master"
|
||||
}
|
||||
|
||||
|
@ -13,14 +13,11 @@ package net.corda.node.services.transactions
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.NotaryService
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -109,58 +106,52 @@ class BFTNonValidatingNotaryService(
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_states")
|
||||
class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty)
|
||||
: PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : PersistentUniquenessProvider.BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef> {
|
||||
private fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> {
|
||||
return AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||
val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
UniquenessProvider.ConsumingTx(
|
||||
id = SecureHash.parse(it.consumingTxHash),
|
||||
inputIndex = it.consumingIndex,
|
||||
requestingParty = Party(
|
||||
name = CordaX500Name.parse(it.party.name),
|
||||
owningKey = Crypto.decodePublicKey(it.party.owningKey))))
|
||||
},
|
||||
toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx ->
|
||||
PersistedCommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString(),
|
||||
consumingIndex = inputIndex,
|
||||
party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(),
|
||||
requestingParty.owningKey.encoded)
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
SecureHash.parse(it.consumingTxHash)
|
||||
)
|
||||
},
|
||||
persistentEntityClass = PersistedCommittedState::class.java
|
||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||
CommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString()
|
||||
)
|
||||
},
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
}
|
||||
|
||||
private class Replica(config: BFTSMaRtConfig,
|
||||
replicaId: Int,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef>,
|
||||
services: ServiceHubInternal,
|
||||
notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
|
||||
|
||||
override fun executeCommand(command: ByteArray): ByteArray {
|
||||
val commitRequest = command.deserialize<BFTSMaRt.CommitRequest>()
|
||||
verifyRequest(commitRequest)
|
||||
val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity)
|
||||
val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature)
|
||||
return response.serialize().bytes
|
||||
}
|
||||
|
||||
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse {
|
||||
private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSMaRt.ReplicaResponse {
|
||||
return try {
|
||||
val id = transaction.id
|
||||
val inputs = transaction.inputs
|
||||
val notary = transaction.notary
|
||||
if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow)
|
||||
if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary)
|
||||
commitInputStates(inputs, id, callerIdentity)
|
||||
commitInputStates(inputs, id, callerIdentity.name, requestSignature)
|
||||
log.debug { "Inputs committed successfully, signing $id" }
|
||||
BFTSMaRt.ReplicaResponse.Signature(sign(id))
|
||||
} catch (e: NotaryInternalException) {
|
||||
@ -176,7 +167,6 @@ class BFTNonValidatingNotaryService(
|
||||
val transaction = commitRequest.payload.coreTransaction
|
||||
val notarisationRequest = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
notarisationRequest.verifySignature(commitRequest.payload.requestSignature, commitRequest.callerIdentity)
|
||||
// TODO: persist the signature for traceability.
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,10 +25,10 @@ import bftsmart.tom.util.Extractor
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.internal.toTypedArray
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -40,6 +40,7 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Client
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Replica
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
@ -179,8 +180,8 @@ object BFTSMaRt {
|
||||
*/
|
||||
abstract class Replica(config: BFTSMaRtConfig,
|
||||
replicaId: Int,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
|
||||
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, SecureHash,
|
||||
BFTNonValidatingNotaryService.CommittedState, PersistentStateRef>,
|
||||
protected val services: ServiceHubInternal,
|
||||
protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() {
|
||||
companion object {
|
||||
@ -225,28 +226,40 @@ object BFTSMaRt {
|
||||
*/
|
||||
abstract fun executeCommand(command: ByteArray): ByteArray?
|
||||
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
|
||||
log.debug { "Attempting to commit inputs for transaction: $txId" }
|
||||
val conflicts = mutableMapOf<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
|
||||
val conflicts = mutableMapOf<StateRef, SecureHash>()
|
||||
services.database.transaction {
|
||||
logRequest(txId, callerName, requestSignature)
|
||||
states.forEach { state ->
|
||||
commitLog[state]?.let { conflicts[state] = it }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" }
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
|
||||
commitLog[stateRef] = txInfo
|
||||
states.forEach { stateRef ->
|
||||
commitLog[stateRef] = txId
|
||||
}
|
||||
} else {
|
||||
log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" }
|
||||
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflict)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logRequest(txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) {
|
||||
val request = PersistentUniquenessProvider.Request(
|
||||
consumingTxHash = txId.toString(),
|
||||
partyName = callerName.toString(),
|
||||
requestSignature = requestSignature.serialize().bytes,
|
||||
requestDate = services.clock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
/** Generates a signature over an arbitrary array of bytes. */
|
||||
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
|
||||
return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) }
|
||||
@ -263,18 +276,25 @@ object BFTSMaRt {
|
||||
// - Add streaming to support large data sets.
|
||||
override fun getSnapshot(): ByteArray {
|
||||
// LinkedHashMap for deterministic serialisation
|
||||
val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
services.database.transaction {
|
||||
commitLog.allPersisted().forEach { m[it.first] = it.second }
|
||||
val committedStates = LinkedHashMap<StateRef, SecureHash>()
|
||||
val requests = services.database.transaction {
|
||||
commitLog.allPersisted().forEach { committedStates[it.first] = it.second }
|
||||
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)
|
||||
criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java))
|
||||
session.createQuery(criteriaQuery).resultList
|
||||
}
|
||||
return m.serialize().bytes
|
||||
return (committedStates to requests).serialize().bytes
|
||||
}
|
||||
|
||||
override fun installSnapshot(bytes: ByteArray) {
|
||||
val m = bytes.deserialize<LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>>()
|
||||
val (committedStates, requests) = bytes.deserialize<Pair<LinkedHashMap<StateRef, SecureHash>, List<PersistentUniquenessProvider.Request>>>()
|
||||
services.database.transaction {
|
||||
commitLog.clear()
|
||||
commitLog.putAll(m)
|
||||
commitLog.putAll(committedStates)
|
||||
val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java)
|
||||
deleteQuery.from(PersistentUniquenessProvider.Request::class.java)
|
||||
session.createQuery(deleteQuery).executeUpdate()
|
||||
requests.forEach { session.persist(it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,116 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated
|
||||
* across a Copycat Raft cluster.
|
||||
*
|
||||
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the
|
||||
* new (or re-joining) cluster member.
|
||||
*/
|
||||
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine(), Snapshottable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
object Commands {
|
||||
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||
// removed from the log during minor or major compaction.
|
||||
//
|
||||
// Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on
|
||||
// purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes.
|
||||
// TODO: Cluster membership changes need testing.
|
||||
// TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to
|
||||
// setup a new cluster of the desired size and transfer the data.
|
||||
return Command.CompactionMode.FULL
|
||||
}
|
||||
}
|
||||
|
||||
class Size : Query<Int>
|
||||
class Get<out K, V>(val key: K) : Query<V?>
|
||||
}
|
||||
|
||||
private val map = db.transaction { createMap() }
|
||||
|
||||
/** Gets a value for the given [Commands.Get.key] */
|
||||
fun get(commit: Commit<Commands.Get<K, V>>): V? {
|
||||
commit.use {
|
||||
val key = it.operation().key
|
||||
return db.transaction { map[key]?.second }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the given [Commands.PutAll.entries] if no entry key already exists.
|
||||
*
|
||||
* @return map containing conflicting entries
|
||||
*/
|
||||
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
|
||||
commit.use {
|
||||
val index = commit.index()
|
||||
val conflicts = LinkedHashMap<K, V>()
|
||||
db.transaction {
|
||||
val entries = commit.operation().entries
|
||||
log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})")
|
||||
for (key in entries.keys) map[key]?.let { conflicts[key] = it.second }
|
||||
if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) })
|
||||
}
|
||||
return conflicts
|
||||
}
|
||||
}
|
||||
|
||||
fun size(commit: Commit<Commands.Size>): Int {
|
||||
commit.use { _ ->
|
||||
return db.transaction { map.size }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
||||
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
||||
* fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
db.transaction {
|
||||
writer.writeInt(map.size)
|
||||
map.allPersisted().forEach { writer.writeObject(it.first to it.second) }
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and adds them to [map]. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
db.transaction {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val (key, value) = reader.readObject<Pair<K, Pair<Long, V>>>()
|
||||
map[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -12,18 +12,13 @@ package net.corda.node.services.transactions
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.ComponentGroupEnum
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.TransactionParts
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.NotaryChangeWireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) {
|
||||
/**
|
||||
@ -35,13 +30,11 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut
|
||||
* undo the commit of the input states (the exact mechanism still needs to be worked out).
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap { payload ->
|
||||
val transaction = payload.coreTransaction
|
||||
val request = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
validateRequest(request, payload.requestSignature)
|
||||
extractParts(transaction)
|
||||
}
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
val transaction = requestPayload.coreTransaction
|
||||
val request = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
validateRequestSignature(request, requestPayload.requestSignature)
|
||||
return extractParts(transaction)
|
||||
}
|
||||
|
||||
private fun extractParts(tx: CoreTransaction): TransactionParts {
|
||||
|
@ -11,62 +11,67 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.hibernate.annotations.Type
|
||||
import java.io.Serializable
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
@MappedSuperclass
|
||||
open class PersistentUniqueness(
|
||||
open class BaseComittedState(
|
||||
@EmbeddedId
|
||||
var id: PersistentStateRef = PersistentStateRef(),
|
||||
val id: PersistentStateRef,
|
||||
|
||||
@Column(name = "consuming_transaction_id")
|
||||
var consumingTxHash: String = "",
|
||||
|
||||
@Column(name = "consuming_input_index", length = 36)
|
||||
var consumingIndex: Int = 0,
|
||||
|
||||
@Embedded
|
||||
var party: PersistentParty = PersistentParty()
|
||||
val consumingTxHash: String
|
||||
)
|
||||
|
||||
@Embeddable
|
||||
data class PersistentParty(
|
||||
@Column(name = "requesting_party_name")
|
||||
var name: String = "",
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log")
|
||||
@CordaSerializable
|
||||
class Request(
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.AUTO)
|
||||
val id: Int = 0,
|
||||
|
||||
@Column(name = "requesting_party_key", length = 255)
|
||||
@Type(type = "corda-wrapper-binary")
|
||||
var owningKey: ByteArray = EMPTY_BYTE_ARRAY
|
||||
) : Serializable
|
||||
@Column(name = "consuming_transaction_id")
|
||||
val consumingTxHash: String,
|
||||
|
||||
@Embedded
|
||||
@Column(name = "requesting_party_name")
|
||||
var partyName: String,
|
||||
|
||||
@Lob
|
||||
@Column(name = "request_signature")
|
||||
val requestSignature: ByteArray,
|
||||
|
||||
@Column(name = "request_timestamp")
|
||||
var requestDate: Instant
|
||||
)
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log")
|
||||
class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty) :
|
||||
PersistentUniqueness(id, consumingTxHash, consumingIndex, party)
|
||||
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private class InnerState {
|
||||
val committedStates = createMap()
|
||||
@ -76,7 +81,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentNotaryCommit, PersistentStateRef> =
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
@ -84,46 +89,58 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
UniquenessProvider.ConsumingTx(
|
||||
id = SecureHash.parse(it.consumingTxHash),
|
||||
inputIndex = it.consumingIndex,
|
||||
requestingParty = Party(
|
||||
name = CordaX500Name.parse(it.party.name),
|
||||
owningKey = Crypto.decodePublicKey(it.party.owningKey))))
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
SecureHash.parse(it.consumingTxHash)
|
||||
)
|
||||
|
||||
},
|
||||
toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx ->
|
||||
PersistentNotaryCommit(
|
||||
toPersistentEntity = { (txHash, index): StateRef, id: SecureHash ->
|
||||
CommittedState(
|
||||
id = PersistentStateRef(txHash.toString(), index),
|
||||
consumingTxHash = id.toString(),
|
||||
consumingIndex = inputIndex,
|
||||
party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.encoded)
|
||||
consumingTxHash = id.toString()
|
||||
)
|
||||
},
|
||||
persistentEntityClass = PersistentNotaryCommit::class.java
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
}
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
logRequest(txId, callerIdentity, requestSignature)
|
||||
val conflict = commitStates(states, txId)
|
||||
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
|
||||
}
|
||||
|
||||
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
val request = Request(
|
||||
consumingTxHash = txId.toString(),
|
||||
partyName = callerIdentity.name.toString(),
|
||||
requestSignature = requestSignature.serialize().bytes,
|
||||
requestDate = clock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
private fun commitStates(states: List<StateRef>, txId: SecureHash): Map<StateRef, StateConsumptionDetails>? {
|
||||
val conflict = mutex.locked {
|
||||
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||
val conflictingStates = LinkedHashMap<StateRef, SecureHash>()
|
||||
for (inputState in states) {
|
||||
val consumingTx = committedStates.get(inputState)
|
||||
val consumingTx = committedStates[inputState]
|
||||
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||
}
|
||||
if (conflictingStates.isNotEmpty()) {
|
||||
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
|
||||
val conflict = conflictingStates.mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) }
|
||||
conflict
|
||||
} else {
|
||||
states.forEachIndexed { i, stateRef ->
|
||||
committedStates[stateRef] = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
|
||||
states.forEach { stateRef ->
|
||||
committedStates[stateRef] = txId
|
||||
}
|
||||
log.debug("Successfully committed all input states: $states")
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict))
|
||||
return conflict
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,261 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import io.atomix.catalyst.buffer.BufferInput
|
||||
import io.atomix.catalyst.buffer.BufferOutput
|
||||
import io.atomix.catalyst.serializer.Serializer
|
||||
import io.atomix.catalyst.serializer.TypeSerializer
|
||||
import io.atomix.copycat.Command
|
||||
import io.atomix.copycat.Query
|
||||
import io.atomix.copycat.server.Commit
|
||||
import io.atomix.copycat.server.Snapshottable
|
||||
import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import java.time.Clock
|
||||
|
||||
/**
|
||||
* Notarised contract state commit log, replicated across a Copycat Raft cluster.
|
||||
*
|
||||
* Copycat ony supports in-memory state machines, so we back the state with JDBC tables.
|
||||
* State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member.
|
||||
*/
|
||||
class RaftTransactionCommitLog<E, EK>(
|
||||
val db: CordaPersistence,
|
||||
val nodeClock: Clock,
|
||||
createMap: () -> AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, E, EK>
|
||||
) : StateMachine(), Snapshottable {
|
||||
object Commands {
|
||||
class CommitTransaction(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val requestingParty: String,
|
||||
val requestSignature: ByteArray
|
||||
) : Command<Map<StateRef, SecureHash>> {
|
||||
override fun compaction(): Command.CompactionMode {
|
||||
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||
// removed from the log during minor or major compaction.
|
||||
//
|
||||
// Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on
|
||||
// purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes.
|
||||
// TODO: Cluster membership changes need testing.
|
||||
// TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to
|
||||
// setup a new cluster of the desired size and transfer the data.
|
||||
return Command.CompactionMode.FULL
|
||||
}
|
||||
}
|
||||
|
||||
class Get(val key: StateRef) : Query<SecureHash?>
|
||||
}
|
||||
|
||||
private val map = db.transaction { createMap() }
|
||||
|
||||
/** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */
|
||||
fun commitTransaction(raftCommit: Commit<Commands.CommitTransaction>): Map<StateRef, SecureHash> {
|
||||
raftCommit.use {
|
||||
val index = it.index()
|
||||
val conflicts = LinkedHashMap<StateRef, SecureHash>()
|
||||
db.transaction {
|
||||
val commitCommand = raftCommit.command()
|
||||
logRequest(commitCommand)
|
||||
val states = commitCommand.states
|
||||
val txId = commitCommand.txId
|
||||
log.debug("State machine commit: storing entries with keys (${states.joinToString()})")
|
||||
for (state in states) {
|
||||
map[state]?.let { conflicts[state] = it.second }
|
||||
}
|
||||
if (conflicts.isEmpty()) {
|
||||
val entries = states.map { it to Pair(index, txId) }.toMap()
|
||||
map.putAll(entries)
|
||||
}
|
||||
}
|
||||
return conflicts
|
||||
}
|
||||
}
|
||||
|
||||
private fun logRequest(commitCommand: RaftTransactionCommitLog.Commands.CommitTransaction) {
|
||||
val request = PersistentUniquenessProvider.Request(
|
||||
consumingTxHash = commitCommand.txId.toString(),
|
||||
partyName = commitCommand.requestingParty,
|
||||
requestSignature = commitCommand.requestSignature,
|
||||
requestDate = nodeClock.instant()
|
||||
)
|
||||
val session = currentDBSession()
|
||||
session.persist(request)
|
||||
}
|
||||
|
||||
/** Gets the consuming transaction id for a given state reference. */
|
||||
fun get(commit: Commit<Commands.Get>): SecureHash? {
|
||||
commit.use {
|
||||
val key = it.operation().key
|
||||
return db.transaction { map[key]?.second }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes out all committed state and notarisation request entries to disk. Note that this operation does not
|
||||
* load all entries into memory, as the [SnapshotWriter] is using a disk-backed buffer internally, and iterating
|
||||
* map entries results in only a fixed number of recently accessed entries to ever be kept in memory.
|
||||
*/
|
||||
override fun snapshot(writer: SnapshotWriter) {
|
||||
db.transaction {
|
||||
writer.writeInt(map.size)
|
||||
map.allPersisted().forEach {
|
||||
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
writer.writeUnsignedShort(bytes.size)
|
||||
writer.writeObject(bytes)
|
||||
}
|
||||
|
||||
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)
|
||||
criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java))
|
||||
val results = session.createQuery(criteriaQuery).resultList
|
||||
|
||||
writer.writeInt(results.size)
|
||||
results.forEach {
|
||||
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
writer.writeUnsignedShort(bytes.size)
|
||||
writer.writeObject(bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads entries from disk and populates the committed state and notarisation request tables. */
|
||||
override fun install(reader: SnapshotReader) {
|
||||
val size = reader.readInt()
|
||||
db.transaction {
|
||||
map.clear()
|
||||
// TODO: read & put entries in batches
|
||||
for (i in 1..size) {
|
||||
val bytes = ByteArray(reader.readUnsignedShort())
|
||||
reader.read(bytes)
|
||||
val (key, value) = bytes.deserialize<Pair<StateRef, Pair<Long, SecureHash>>>()
|
||||
map[key] = value
|
||||
}
|
||||
// Clean notarisation request log
|
||||
val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java)
|
||||
deleteQuery.from(PersistentUniquenessProvider.Request::class.java)
|
||||
session.createQuery(deleteQuery).executeUpdate()
|
||||
// Load and populate request log
|
||||
for (i in 1..reader.readInt()) {
|
||||
val bytes = ByteArray(reader.readUnsignedShort())
|
||||
reader.read(bytes)
|
||||
val request = bytes.deserialize<PersistentUniquenessProvider.Request>()
|
||||
session.persist(request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
// Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
|
||||
val serializer: Serializer by lazy {
|
||||
Serializer().apply {
|
||||
register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) {
|
||||
object : TypeSerializer<Commands.CommitTransaction> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction,
|
||||
buffer: BufferOutput<out BufferOutput<*>>,
|
||||
serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.states.size)
|
||||
with(serializer) {
|
||||
obj.states.forEach {
|
||||
writeObject(it, buffer)
|
||||
}
|
||||
writeObject(obj.txId, buffer)
|
||||
}
|
||||
buffer.writeString(obj.requestingParty)
|
||||
buffer.writeInt(obj.requestSignature.size)
|
||||
buffer.write(obj.requestSignature)
|
||||
}
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.CommitTransaction>,
|
||||
buffer: BufferInput<out BufferInput<*>>,
|
||||
serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction {
|
||||
val stateCount = buffer.readUnsignedShort()
|
||||
val states = (1..stateCount).map {
|
||||
serializer.readObject<StateRef>(buffer)
|
||||
}
|
||||
val txId = serializer.readObject<SecureHash>(buffer)
|
||||
val name = buffer.readString()
|
||||
val signatureSize = buffer.readInt()
|
||||
val signature = ByteArray(signatureSize)
|
||||
buffer.read(signature)
|
||||
return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(RaftTransactionCommitLog.Commands.Get::class.java) {
|
||||
object : TypeSerializer<Commands.Get> {
|
||||
override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
serializer.writeObject(obj.key, buffer)
|
||||
}
|
||||
|
||||
override fun read(type: Class<RaftTransactionCommitLog.Commands.Get>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get {
|
||||
val key = serializer.readObject<StateRef>(buffer)
|
||||
return RaftTransactionCommitLog.Commands.Get(key)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
register(StateRef::class.java) {
|
||||
object : TypeSerializer<StateRef> {
|
||||
override fun write(obj: StateRef, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeString(obj.encoded())
|
||||
}
|
||||
|
||||
override fun read(type: Class<StateRef>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): StateRef {
|
||||
return buffer.readString().parseStateRef()
|
||||
}
|
||||
}
|
||||
}
|
||||
registerAbstract(SecureHash::class.java) {
|
||||
object : TypeSerializer<SecureHash> {
|
||||
override fun write(obj: SecureHash, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeUnsignedShort(obj.bytes.size)
|
||||
buffer.write(obj.bytes)
|
||||
}
|
||||
|
||||
override fun read(type: Class<SecureHash>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): SecureHash {
|
||||
val size = buffer.readUnsignedShort()
|
||||
val bytes = ByteArray(size)
|
||||
buffer.read(bytes)
|
||||
return SecureHash.SHA256(bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
register(LinkedHashMap::class.java) {
|
||||
object : TypeSerializer<LinkedHashMap<*, *>> {
|
||||
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) {
|
||||
buffer.writeInt(obj.size)
|
||||
obj.forEach {
|
||||
with(serializer) {
|
||||
writeObject(it.key, buffer)
|
||||
writeObject(it.value, buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<*, *> {
|
||||
return LinkedHashMap<Any, Any>().apply {
|
||||
repeat(buffer.readInt()) {
|
||||
put(serializer.readObject(buffer), serializer.readObject(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -12,10 +12,6 @@ package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import io.atomix.catalyst.buffer.BufferInput
|
||||
import io.atomix.catalyst.buffer.BufferOutput
|
||||
import io.atomix.catalyst.serializer.Serializer
|
||||
import io.atomix.catalyst.serializer.TypeSerializer
|
||||
import io.atomix.catalyst.transport.Address
|
||||
import io.atomix.catalyst.transport.Transport
|
||||
import io.atomix.catalyst.transport.netty.NettyTransport
|
||||
@ -30,14 +26,14 @@ import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.RaftConfig
|
||||
@ -46,11 +42,14 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import java.nio.file.Path
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.*
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.EmbeddedId
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Table
|
||||
|
||||
/**
|
||||
* A uniqueness provider that records committed input states in a distributed collection replicated and
|
||||
@ -61,37 +60,49 @@ import javax.persistence.*
|
||||
* to the cluster leader to be actioned.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
class RaftUniquenessProvider(
|
||||
private val transportConfiguration: NodeSSLConfiguration,
|
||||
private val db: CordaPersistence,
|
||||
private val clock: Clock,
|
||||
private val metrics: MetricRegistry,
|
||||
private val raftConfig: RaftConfig
|
||||
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<String, Pair<Long, Any>, RaftState, String> =
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { it },
|
||||
toPersistentEntityKey = { PersistentStateRef(it) },
|
||||
fromPersistentEntity = {
|
||||
Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)))
|
||||
val txId = it.id.txId
|
||||
?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(
|
||||
StateRef(txhash = SecureHash.parse(txId), index = index),
|
||||
Pair(it.index, SecureHash.parse(it.value) as SecureHash))
|
||||
|
||||
},
|
||||
toPersistentEntity = { k: String, v: Pair<Long, Any> ->
|
||||
RaftState().apply {
|
||||
key = k
|
||||
value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
index = v.first
|
||||
}
|
||||
toPersistentEntity = { k: StateRef, v: Pair<Long, SecureHash> ->
|
||||
CommittedState(
|
||||
PersistentStateRef(k),
|
||||
v.second.toString(),
|
||||
v.first)
|
||||
|
||||
},
|
||||
persistentEntityClass = RaftState::class.java
|
||||
persistentEntityClass = CommittedState::class.java
|
||||
)
|
||||
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
fun String.parseStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_states")
|
||||
class RaftState(
|
||||
@Id
|
||||
@Column(name = "id")
|
||||
var key: String = "",
|
||||
|
||||
@Lob
|
||||
@Column(name = "state_value")
|
||||
var value: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
@Column(name = "state_index")
|
||||
class CommittedState(
|
||||
@EmbeddedId
|
||||
val id: PersistentStateRef,
|
||||
@Column(name = "consuming_transaction_id")
|
||||
var value: String = "",
|
||||
@Column(name = "raft_log_index")
|
||||
var index: Long = 0
|
||||
)
|
||||
|
||||
@ -110,41 +121,17 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
fun start() {
|
||||
log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}")
|
||||
val stateMachineFactory = {
|
||||
DistributedImmutableMap(db, RaftUniquenessProvider.Companion::createMap)
|
||||
RaftTransactionCommitLog(db, clock, RaftUniquenessProvider.Companion::createMap)
|
||||
}
|
||||
val address = raftConfig.nodeAddress.let { Address(it.host, it.port) }
|
||||
val storage = buildStorage(storagePath)
|
||||
val transport = buildTransport(transportConfiguration)
|
||||
val serializer = Serializer().apply {
|
||||
// Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
|
||||
register(DistributedImmutableMap.Commands.PutAll::class.java) {
|
||||
object : TypeSerializer<DistributedImmutableMap.Commands.PutAll<*, *>> {
|
||||
override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>,
|
||||
buffer: BufferOutput<out BufferOutput<*>>,
|
||||
serializer: Serializer) {
|
||||
writeMap(obj.entries, buffer, serializer)
|
||||
}
|
||||
|
||||
override fun read(type: Class<DistributedImmutableMap.Commands.PutAll<*, *>>,
|
||||
buffer: BufferInput<out BufferInput<*>>,
|
||||
serializer: Serializer): DistributedImmutableMap.Commands.PutAll<Any, Any> {
|
||||
return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer))
|
||||
}
|
||||
}
|
||||
}
|
||||
register(LinkedHashMap::class.java) {
|
||||
object : TypeSerializer<LinkedHashMap<*, *>> {
|
||||
override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) = writeMap(obj, buffer, serializer)
|
||||
override fun read(type: Class<LinkedHashMap<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer) = readMap(buffer, serializer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withServerTransport(transport)
|
||||
.withSerializer(serializer)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
|
||||
val serverFuture = if (raftConfig.clusterAddresses.isNotEmpty()) {
|
||||
@ -161,7 +148,7 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
val client = CopycatClient.builder(address)
|
||||
.withTransport(transport) // TODO: use local transport for client-server communications
|
||||
.withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF)
|
||||
.withSerializer(serializer)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.withRecoveryStrategy(RecoveryStrategies.RECOVER)
|
||||
.build()
|
||||
_clientFuture = serverFuture.thenCompose { client.connect(address) }
|
||||
@ -210,50 +197,22 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
}
|
||||
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) }
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
log.debug("Attempting to commit input states: ${states.joinToString()}")
|
||||
val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries))
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(
|
||||
states,
|
||||
txId,
|
||||
callerIdentity.name.toString(),
|
||||
requestSignature.serialize().bytes
|
||||
)
|
||||
val conflicts = client.submit(commitCommand).get()
|
||||
|
||||
if (conflicts.isNotEmpty()) {
|
||||
val conflictingStates = decode(conflicts).mapValues { StateConsumptionDetails(it.value.id.sha256()) }
|
||||
val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) }
|
||||
val error = NotaryError.Conflict(txId, conflictingStates)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
log.debug("All input states of transaction $txId have been committed")
|
||||
}
|
||||
|
||||
/**
|
||||
* Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray
|
||||
* here to avoid having to define additional serializers for our custom types.
|
||||
*/
|
||||
private fun encode(items: List<Pair<StateRef, UniquenessProvider.ConsumingTx>>): Map<String, ByteArray> {
|
||||
fun StateRef.encoded() = "$txhash:$index"
|
||||
return items.map { it.first.encoded() to it.second.serialize().bytes }.toMap()
|
||||
}
|
||||
|
||||
private fun decode(items: Map<String, ByteArray>): Map<StateRef, UniquenessProvider.ConsumingTx> {
|
||||
fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) }
|
||||
return items.map { it.key.toStateRef() to it.value.deserialize<UniquenessProvider.ConsumingTx>() }.toMap()
|
||||
}
|
||||
}
|
||||
|
||||
private fun writeMap(map: Map<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) = with(map) {
|
||||
buffer.writeInt(size)
|
||||
forEach {
|
||||
with(serializer) {
|
||||
writeObject(it.key, buffer)
|
||||
writeObject(it.value, buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun readMap(buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<Any, Any> {
|
||||
return LinkedHashMap<Any, Any>().apply {
|
||||
repeat(buffer.readInt()) {
|
||||
put(serializer.readObject(buffer), serializer.readObject(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A simple Notary service that does not perform transaction validation */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -15,12 +15,11 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import java.security.SignatureException
|
||||
|
||||
/**
|
||||
@ -35,9 +34,10 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor
|
||||
* the transaction in question has all required signatures apart from the notary's.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
try {
|
||||
val stx = receiveTransaction()
|
||||
val stx = requestPayload.signedTransaction
|
||||
validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature)
|
||||
val notary = stx.notary
|
||||
checkNotary(notary)
|
||||
resolveAndContractVerify(stx)
|
||||
@ -53,15 +53,6 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun receiveTransaction(): SignedTransaction {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap {
|
||||
val stx = it.signedTransaction
|
||||
validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature)
|
||||
stx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveAndContractVerify(stx: SignedTransaction) {
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
|
||||
|
@ -18,7 +18,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
||||
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -10,10 +10,13 @@
|
||||
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryInternalException
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
@ -29,6 +32,7 @@ import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
@ -38,6 +42,7 @@ class PersistentUniquenessProviderTests {
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
|
||||
private val txID = SecureHash.randomSHA256()
|
||||
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
|
||||
|
||||
private lateinit var database: CordaPersistence
|
||||
|
||||
@ -56,23 +61,25 @@ class PersistentUniquenessProviderTests {
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
database.transaction {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity)
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
database.transaction {
|
||||
val provider = PersistentUniquenessProvider()
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
provider.commit(inputs, txID, identity)
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
|
||||
val ex = assertFailsWith<NotaryInternalException> { provider.commit(inputs, txID, identity) }
|
||||
val ex = assertFailsWith<NotaryInternalException> {
|
||||
provider.commit(inputs, txID, identity, requestSignature)
|
||||
}
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
|
||||
val conflictCause = error.consumedStates[inputState]!!
|
||||
|
@ -16,6 +16,8 @@ import io.atomix.copycat.client.CopycatClient
|
||||
import io.atomix.copycat.server.CopycatServer
|
||||
import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.concurrent.asCordaFuture
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -24,20 +26,22 @@ import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.freeLocalHostAndPort
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class DistributedImmutableMapTests {
|
||||
class RaftTransactionCommitLogTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
@Rule
|
||||
@ -50,12 +54,13 @@ class DistributedImmutableMapTests {
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel("+io.atomix")
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset("org.apache.activemq", "io.atomix")
|
||||
cluster.map { it.client.close().asCordaFuture() }.transpose().getOrThrow()
|
||||
cluster.map { it.server.shutdown().asCordaFuture() }.transpose().getOrThrow()
|
||||
databases.forEach { it.close() }
|
||||
@ -65,28 +70,38 @@ class DistributedImmutableMapTests {
|
||||
fun `stores entries correctly`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
val conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
|
||||
val value1 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key1"))
|
||||
val value2 = client.submit(DistributedImmutableMap.Commands.Get<String, String>("key2"))
|
||||
val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0]))
|
||||
val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1]))
|
||||
|
||||
assertEquals(value1.getOrThrow(), "value1")
|
||||
assertEquals(value2.getOrThrow(), "value2")
|
||||
assertEquals(value1.getOrThrow(), txId)
|
||||
assertEquals(value2.getOrThrow(), txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `returns conflict for duplicate entries`() {
|
||||
val client = cluster.last().client
|
||||
|
||||
val entries = mapOf("key1" to "value1", "key2" to "value2")
|
||||
val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0))
|
||||
val txId: SecureHash = SecureHash.randomSHA256()
|
||||
val requestingPartyName = ALICE_NAME
|
||||
val requestSignature = ByteArray(1024)
|
||||
|
||||
var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature)
|
||||
var conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertTrue { conflict.isEmpty() }
|
||||
conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow()
|
||||
assertTrue { conflict == entries }
|
||||
|
||||
conflict = client.submit(commitCommand).getOrThrow()
|
||||
assertEquals(conflict.keys, states.toSet())
|
||||
conflict.forEach { assertEquals(it.value, txId) }
|
||||
}
|
||||
|
||||
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
|
||||
@ -101,11 +116,12 @@ class DistributedImmutableMapTests {
|
||||
val address = Address(myAddress.host, myAddress.port)
|
||||
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock(), NodeSchemaService(includeNotarySchemas = true))
|
||||
databases.add(database)
|
||||
val stateMachineFactory = { DistributedImmutableMap(database, RaftUniquenessProvider.Companion::createMap) }
|
||||
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) }
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
.withStorage(storage)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
|
||||
val serverInitFuture = if (clusterAddress != null) {
|
||||
@ -117,6 +133,7 @@ class DistributedImmutableMapTests {
|
||||
|
||||
val client = CopycatClient.builder(address)
|
||||
.withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF)
|
||||
.withSerializer(RaftTransactionCommitLog.serializer)
|
||||
.build()
|
||||
return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) }
|
||||
}
|
@ -22,7 +22,7 @@ class IRSDemoClientApi(hostAndPort: NetworkHostAndPort) {
|
||||
private val api = HttpApi.fromHostAndPort(hostAndPort, apiRoot)
|
||||
|
||||
fun runTrade(tradeId: String, oracleName: CordaX500Name) {
|
||||
val fileContents = IOUtils.toString(javaClass.classLoader.getResourceAsStream("net/corda/irs/simulation/example-irs-trade.json"), Charsets.UTF_8.name())
|
||||
val fileContents = IOUtils.toString(javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/example-irs-trade.json"), Charsets.UTF_8.name())
|
||||
val tradeFile = fileContents.replace("tradeXXX", tradeId).replace("oracleXXX", oracleName.toString())
|
||||
api.postJson("deals", tradeFile)
|
||||
}
|
||||
|
@ -139,7 +139,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
|
||||
// have the convenient copy() method that'd let us make small adjustments. Instead they're partly mutable.
|
||||
// TODO: We should revisit this in post-Excalibur cleanup and fix, e.g. by introducing an interface.
|
||||
|
||||
val irs = om.readValue<InterestRateSwap.State>(javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/trade.json")
|
||||
val resourceAsStream = javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/trade.json")
|
||||
?: error("Trade representation cannot be loaded.")
|
||||
val irs = om.readValue<InterestRateSwap.State>(resourceAsStream
|
||||
.reader()
|
||||
.readText()
|
||||
.replace("oracleXXX", RatesOracleNode.RATES_SERVICE_NAME.toString()))
|
||||
|
@ -15,14 +15,13 @@ import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.validateRequest
|
||||
import net.corda.core.internal.validateRequestSignature
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionWithSignatures
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import java.security.PublicKey
|
||||
import java.security.SignatureException
|
||||
@ -36,7 +35,7 @@ import java.security.SignatureException
|
||||
// START 1
|
||||
@CordaService
|
||||
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
@ -53,9 +52,10 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating
|
||||
* transaction dependency chain.
|
||||
*/
|
||||
@Suspendable
|
||||
override fun receiveAndVerifyTx(): TransactionParts {
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
try {
|
||||
val stx = receiveTransaction()
|
||||
val stx = requestPayload.signedTransaction
|
||||
validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature)
|
||||
val notary = stx.notary
|
||||
checkNotary(notary)
|
||||
verifySignatures(stx)
|
||||
@ -71,15 +71,6 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun receiveTransaction(): SignedTransaction {
|
||||
return otherSideSession.receive<NotarisationPayload>().unwrap {
|
||||
val stx = it.signedTransaction
|
||||
validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature)
|
||||
stx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun resolveAndContractVerify(stx: SignedTransaction) {
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
|
||||
|
Loading…
Reference in New Issue
Block a user