From 15e4449b38b5e8ecd0ef224baa842d631a5c0a53 Mon Sep 17 00:00:00 2001 From: Michal Kit Date: Thu, 15 Mar 2018 10:11:01 +0000 Subject: [PATCH 1/3] Adding BigInteger serialization support (#2804) * Adding BigInteger serialization support * Addressing review comments --- docs/source/changelog.rst | 4 ++++ .../amqp/AMQPSerializationScheme.kt | 1 + .../amqp/custom/BigIntegerSerializer.kt | 11 +++++++++++ .../amqp/SerializationOutputTests.kt | 16 ++++++++++++++++ 4 files changed, 32 insertions(+) create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/BigIntegerSerializer.kt diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 7626d7d312..a7a24e834e 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -13,6 +13,10 @@ from the previous milestone release. * Introduced a placeholder for custom properties within ``node.conf``; the property key is "custom". +* java.math.BigInteger serialization support added. + +* java.security.cert.CRLReason added to the default Whitelist. + .. _changelog_v3: Version 3.0 diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index f099ce7af5..b56cbea372 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -53,6 +53,7 @@ abstract class AbstractAMQPSerializationScheme(val cordappLoader: List) register(net.corda.nodeapi.internal.serialization.amqp.custom.PrivateKeySerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.ThrowableSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.BigDecimalSerializer) + register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.CurrencySerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.OpaqueBytesSubSequenceSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.InstantSerializer(this)) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/BigIntegerSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/BigIntegerSerializer.kt new file mode 100644 index 0000000000..268676c312 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/BigIntegerSerializer.kt @@ -0,0 +1,11 @@ +package net.corda.nodeapi.internal.serialization.amqp.custom + +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import java.math.BigInteger + +/** + * A serializer for [BigInteger], utilising the string based helper. [BigInteger] seems to have no import/export + * features that are precision independent other than via a string. The format of the string is discussed in the + * documentation for [BigInteger.toString]. + */ +object BigIntegerSerializer : CustomSerializer.ToString(BigInteger::class.java) \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt index 5fc4359e88..bf081d04ad 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt @@ -40,6 +40,7 @@ import java.io.ByteArrayInputStream import java.io.IOException import java.io.NotSerializableException import java.math.BigDecimal +import java.math.BigInteger import java.time.* import java.time.temporal.ChronoUnit import java.util.* @@ -976,6 +977,21 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi assertEquals(objCopy.a, objCopy.b) } + data class BigIntegers(val a: BigInteger, val b: BigInteger) + + @Test + fun `test BigInteger custom serializer`() { + val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()) + factory.register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer) + + val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()) + factory2.register(net.corda.nodeapi.internal.serialization.amqp.custom.BigIntegerSerializer) + + val obj = BigIntegers(BigInteger.TEN, BigInteger.TEN) + val objCopy = serdes(obj, factory, factory2) + assertEquals(objCopy.a, objCopy.b) + } + data class ByteArrays(val a: ByteArray, val b: ByteArray) @Test From e31d2b0cad6e444152426b8c740326ca0b84b805 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Thu, 15 Mar 2018 13:29:42 +0000 Subject: [PATCH 2/3] =?UTF-8?q?CORDA-1208:=20Notary=20service=20should=20p?= =?UTF-8?q?ersist=20the=20notarisation=20request=20si=E2=80=A6=20(#2823)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * CORDA-1208: Notary service should persist the notarisation request signature along with the committed input states. This required modifying the uniqueness provider interface to accept the signature in addition to input states. Until now the committed state log used to be stored as a map of (state reference -> (tranasction id, consuming party)). Adding the serialized signature would mean inflating each state entry by around 700 bytes, which would be grossly inefficient. Instead, two tables are now used: one for storing (state referece -> transaction id) map, and another for storing the notarisation request details (transaction id, consuming party, date, signature). * Update api - all of these changes are only related to custom notaries --- .ci/api-current.txt | 6 +- .../kotlin/net/corda/core/flows/NotaryFlow.kt | 13 +- .../net/corda/core/internal/InternalUtils.kt | 3 +- .../corda/core/node/services/NotaryService.kt | 4 +- .../core/node/services/UniquenessProvider.kt | 5 +- .../NotaryExceptionSerializationTest.kt | 31 +++ .../UniquenessExceptionSerializationTest.kt | 33 --- .../net/corda/node/internal/AbstractNode.kt | 2 +- .../node/services/schema/NodeSchemaService.kt | 9 +- .../BFTNonValidatingNotaryService.kt | 46 ++- .../node/services/transactions/BFTSMaRt.kt | 50 +++- .../transactions/DistributedImmutableMap.kt | 106 ------- .../transactions/NonValidatingNotaryFlow.kt | 23 +- .../PersistentUniquenessProvider.kt | 117 ++++---- .../transactions/RaftTransactionCommitLog.kt | 261 ++++++++++++++++++ .../transactions/RaftUniquenessProvider.kt | 141 ++++------ .../transactions/SimpleNotaryService.kt | 2 +- .../transactions/ValidatingNotaryFlow.kt | 17 +- .../transactions/ValidatingNotaryService.kt | 2 +- .../PersistentUniquenessProviderTests.kt | 19 +- ...ts.kt => RaftTransactionCommitLogTests.kt} | 47 +++- .../corda/notarydemo/MyCustomNotaryService.kt | 19 +- 22 files changed, 549 insertions(+), 407 deletions(-) create mode 100644 core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt delete mode 100644 core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt rename node/src/test/kotlin/net/corda/node/services/transactions/{DistributedImmutableMapTests.kt => RaftTransactionCommitLogTests.kt} (65%) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 8a30c60338..bf524bd54a 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1460,7 +1460,7 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net @co.paralleluniverse.fibers.Suspendable protected final void checkNotary(net.corda.core.identity.Party) @org.jetbrains.annotations.NotNull public final net.corda.core.flows.FlowSession getOtherSideSession() @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService() - @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.TransactionParts receiveAndVerifyTx() + @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload) ## public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic public (net.corda.core.flows.FlowSession) @@ -2066,7 +2066,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l ## @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService public () - public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) + public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) @org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog() @org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider() @@ -2082,7 +2082,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError() ## public interface net.corda.core.node.services.UniquenessProvider - public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) + public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object public (Map) diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index 2c3ef48413..2fd9dbf4b0 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -127,7 +127,7 @@ class NotaryFlow { * It checks that the time-window command is valid (if present) and commits the input state, or returns a conflict * if any of the input states have been previously committed. * - * Additional transaction validation logic can be added when implementing [receiveAndVerifyTx]. + * Additional transaction validation logic can be added when implementing [validateRequest]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic() { @@ -137,13 +137,15 @@ class NotaryFlow { check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) { "We are not a notary on the network" } + + val requestPayload = otherSideSession.receive().unwrap { it } var txId: SecureHash? = null try { - val parts = receiveAndVerifyTx() + val parts = validateRequest(requestPayload) txId = parts.id checkNotary(parts.notary) service.validateTimeWindow(parts.timestamp) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty) + service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature) signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) @@ -152,11 +154,10 @@ class NotaryFlow { } /** - * Implement custom logic to receive the transaction to notarise, and perform verification based on validity and - * privacy requirements. + * Implement custom logic to perform transaction verification based on validity and privacy requirements. */ @Suspendable - abstract fun receiveAndVerifyTx(): TransactionParts + protected abstract fun validateRequest(requestPayload: NotarisationPayload): TransactionParts /** Check if transaction is intended to be signed by this notary. */ @Suspendable diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index cfde75e90a..73e6d7819d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -397,10 +397,9 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade } /** Verifies that the correct notarisation request was signed by the counterparty. */ -fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) { +fun NotaryFlow.Service.validateRequestSignature(request: NotarisationRequest, signature: NotarisationRequestSignature) { val requestingParty = otherSideSession.counterparty request.verifySignature(signature, requestingParty) - // TODO: persist the signature for traceability. Do we need to persist the request as well? } /** Creates a signature over the notarisation request using the legal identity key. */ diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt index 0cb1e9fcf6..c7aa072214 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -79,9 +79,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that * this method does not throw an exception when input states are present multiple times within the transaction. */ - fun commitInputStates(inputs: List, txId: SecureHash, caller: Party) { + fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) { try { - uniquenessProvider.commit(inputs, txId, caller) + uniquenessProvider.commit(inputs, txId, caller, requestSignature) } catch (e: NotaryInternalException) { if (e.error is NotaryError.Conflict) { val conflicts = inputs.filterIndexed { _, stateRef -> diff --git a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt index 2873d22d74..5a2950d0d4 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt @@ -3,6 +3,7 @@ package net.corda.core.node.services import net.corda.core.CordaException import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable @@ -14,11 +15,12 @@ import net.corda.core.serialization.CordaSerializable */ interface UniquenessProvider { /** Commits all input states of the given transaction. */ - fun commit(states: List, txId: SecureHash, callerIdentity: Party) + fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) /** Specifies the consuming transaction for every conflicting state. */ @CordaSerializable @Deprecated("No longer used due to potential privacy leak") + @Suppress("DEPRECATION") data class Conflict(val stateHistory: Map) /** @@ -26,6 +28,7 @@ interface UniquenessProvider { * the caller identity requesting the commit. */ @CordaSerializable + @Deprecated("No longer used") data class ConsumingTx(val id: SecureHash, val inputIndex: Int, val requestingParty: Party) } diff --git a/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt new file mode 100644 index 0000000000..b1cd4e7d4f --- /dev/null +++ b/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt @@ -0,0 +1,31 @@ +package net.corda.core.serialization + +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryException +import net.corda.core.flows.StateConsumptionDetails +import net.corda.testing.core.SerializationEnvironmentRule +import org.junit.Rule +import org.junit.Test +import kotlin.test.assertEquals + +class NotaryExceptionSerializationTest { + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + @Test + fun testSerializationRoundTrip() { + val txhash = SecureHash.randomSHA256() + val stateHistory: Map = mapOf( + StateRef(txhash, 0) to StateConsumptionDetails(txhash.sha256()) + ) + val error = NotaryError.Conflict(txhash, stateHistory) + val instance = NotaryException(error) + val instanceOnTheOtherSide = instance.serialize().bytes.deserialize() + + assertEquals(instance.error, instanceOnTheOtherSide.error) + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt deleted file mode 100644 index f1ac6956a3..0000000000 --- a/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt +++ /dev/null @@ -1,33 +0,0 @@ -package net.corda.core.serialization - -import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.generateKeyPair -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.node.services.UniquenessException -import net.corda.core.node.services.UniquenessProvider -import net.corda.testing.core.SerializationEnvironmentRule -import org.junit.Rule -import org.junit.Test -import kotlin.test.assertEquals - -class UniquenessExceptionSerializationTest { - @Rule - @JvmField - val testSerialization = SerializationEnvironmentRule() - - @Test - fun testSerializationRoundTrip() { - val txhash = SecureHash.randomSHA256() - val txHash2 = SecureHash.randomSHA256() - val dummyParty = Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public) - val stateHistory: Map = mapOf(StateRef(txhash, 0) to UniquenessProvider.ConsumingTx(txHash2, 1, dummyParty)) - val conflict = UniquenessProvider.Conflict(stateHistory) - val instance = UniquenessException(conflict) - - val instanceOnTheOtherSide = instance.serialize().deserialize() - - assertEquals(instance.error, instanceOnTheOtherSide.error) - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 4d761a6ded..cca607030d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -673,7 +673,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service") return notaryConfig.run { if (raft != null) { - val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) + val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.clock, services.monitoringService.metrics, raft) (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) } else if (bftSMaRt != null) { if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 97bdb9518f..d874e838e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -55,10 +55,11 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot object NodeNotary object NodeNotaryV1 : MappedSchema(schemaFamily = NodeNotary.javaClass, version = 1, - mappedTypes = listOf(PersistentUniquenessProvider.PersistentUniqueness::class.java, - PersistentUniquenessProvider.PersistentNotaryCommit::class.java, - RaftUniquenessProvider.RaftState::class.java, - BFTNonValidatingNotaryService.PersistedCommittedState::class.java + mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java, + PersistentUniquenessProvider.Request::class.java, + PersistentUniquenessProvider.CommittedState::class.java, + RaftUniquenessProvider.CommittedState::class.java, + BFTNonValidatingNotaryService.CommittedState::class.java )) // Required schemas are those used by internal Corda services diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 4602088edd..9862e47807 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -3,14 +3,11 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.SettableFuture import net.corda.core.contracts.StateRef -import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData import net.corda.core.flows.* -import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.NotaryService -import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -99,58 +96,52 @@ class BFTNonValidatingNotaryService( @Entity @Table(name = "${NODE_DATABASE_PREFIX}bft_committed_states") - class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty) - : PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party) + class CommittedState(id: PersistentStateRef, consumingTxHash: String) : PersistentUniquenessProvider.BaseComittedState(id, consumingTxHash) - private fun createMap(): AppendOnlyPersistentMap { + private fun createMap(): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val txId = it.id.txId + ?: throw IllegalStateException("DB returned null SecureHash transactionId") val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") - Pair(StateRef(txhash = SecureHash.parse(txId), index = index), - UniquenessProvider.ConsumingTx( - id = SecureHash.parse(it.consumingTxHash), - inputIndex = it.consumingIndex, - requestingParty = Party( - name = CordaX500Name.parse(it.party.name), - owningKey = Crypto.decodePublicKey(it.party.owningKey)))) - }, - toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx -> - PersistedCommittedState( - id = PersistentStateRef(txHash.toString(), index), - consumingTxHash = id.toString(), - consumingIndex = inputIndex, - party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(), - requestingParty.owningKey.encoded) + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + SecureHash.parse(it.consumingTxHash) ) }, - persistentEntityClass = PersistedCommittedState::class.java + toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> + CommittedState( + id = PersistentStateRef(txHash.toString(), index), + consumingTxHash = id.toString() + ) + }, + persistentEntityClass = CommittedState::class.java ) } private class Replica(config: BFTSMaRtConfig, replicaId: Int, - createMap: () -> AppendOnlyPersistentMap, + createMap: () -> AppendOnlyPersistentMap, services: ServiceHubInternal, notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) { override fun executeCommand(command: ByteArray): ByteArray { val commitRequest = command.deserialize() verifyRequest(commitRequest) - val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity) + val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature) return response.serialize().bytes } - private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse { + private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSMaRt.ReplicaResponse { return try { val id = transaction.id val inputs = transaction.inputs val notary = transaction.notary if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow) if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary) - commitInputStates(inputs, id, callerIdentity) + commitInputStates(inputs, id, callerIdentity.name, requestSignature) log.debug { "Inputs committed successfully, signing $id" } BFTSMaRt.ReplicaResponse.Signature(sign(id)) } catch (e: NotaryInternalException) { @@ -166,7 +157,6 @@ class BFTNonValidatingNotaryService( val transaction = commitRequest.payload.coreTransaction val notarisationRequest = NotarisationRequest(transaction.inputs, transaction.id) notarisationRequest.verifySignature(commitRequest.payload.requestSignature, commitRequest.callerIdentity) - // TODO: persist the signature for traceability. } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 24e3de6089..225dbc4e76 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -15,10 +15,10 @@ import bftsmart.tom.util.Extractor import net.corda.core.contracts.StateRef import net.corda.core.crypto.* import net.corda.core.flows.* +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.declaredField import net.corda.core.internal.toTypedArray -import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -30,6 +30,7 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Replica import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.currentDBSession import java.nio.file.Path import java.security.PublicKey import java.util.* @@ -169,8 +170,8 @@ object BFTSMaRt { */ abstract class Replica(config: BFTSMaRtConfig, replicaId: Int, - createMap: () -> AppendOnlyPersistentMap, + createMap: () -> AppendOnlyPersistentMap, protected val services: ServiceHubInternal, protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() { companion object { @@ -215,28 +216,40 @@ object BFTSMaRt { */ abstract fun executeCommand(command: ByteArray): ByteArray? - protected fun commitInputStates(states: List, txId: SecureHash, callerIdentity: Party) { + protected fun commitInputStates(states: List, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) { log.debug { "Attempting to commit inputs for transaction: $txId" } - val conflicts = mutableMapOf() + + val conflicts = mutableMapOf() services.database.transaction { + logRequest(txId, callerName, requestSignature) states.forEach { state -> commitLog[state]?.let { conflicts[state] = it } } if (conflicts.isEmpty()) { log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } - states.forEachIndexed { i, stateRef -> - val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) - commitLog[stateRef] = txInfo + states.forEach { stateRef -> + commitLog[stateRef] = txId } } else { log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" } - val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } val error = NotaryError.Conflict(txId, conflict) throw NotaryInternalException(error) } } } + private fun logRequest(txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) { + val request = PersistentUniquenessProvider.Request( + consumingTxHash = txId.toString(), + partyName = callerName.toString(), + requestSignature = requestSignature.serialize().bytes, + requestDate = services.clock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + /** Generates a signature over an arbitrary array of bytes. */ protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) } @@ -253,18 +266,25 @@ object BFTSMaRt { // - Add streaming to support large data sets. override fun getSnapshot(): ByteArray { // LinkedHashMap for deterministic serialisation - val m = LinkedHashMap() - services.database.transaction { - commitLog.allPersisted().forEach { m[it.first] = it.second } + val committedStates = LinkedHashMap() + val requests = services.database.transaction { + commitLog.allPersisted().forEach { committedStates[it.first] = it.second } + val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java) + criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java)) + session.createQuery(criteriaQuery).resultList } - return m.serialize().bytes + return (committedStates to requests).serialize().bytes } override fun installSnapshot(bytes: ByteArray) { - val m = bytes.deserialize>() + val (committedStates, requests) = bytes.deserialize, List>>() services.database.transaction { commitLog.clear() - commitLog.putAll(m) + commitLog.putAll(committedStates) + val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java) + deleteQuery.from(PersistentUniquenessProvider.Request::class.java) + session.createQuery(deleteQuery).executeUpdate() + requests.forEach { session.persist(it) } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt deleted file mode 100644 index 4cb2082fb4..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt +++ /dev/null @@ -1,106 +0,0 @@ -package net.corda.node.services.transactions - -import io.atomix.copycat.Command -import io.atomix.copycat.Query -import io.atomix.copycat.server.Commit -import io.atomix.copycat.server.Snapshottable -import io.atomix.copycat.server.StateMachine -import io.atomix.copycat.server.storage.snapshot.SnapshotReader -import io.atomix.copycat.server.storage.snapshot.SnapshotWriter -import net.corda.core.utilities.contextLogger -import net.corda.node.utilities.AppendOnlyPersistentMap -import net.corda.nodeapi.internal.persistence.CordaPersistence -import java.util.* - -/** - * A distributed map state machine that doesn't allow overriding values. The state machine is replicated - * across a Copycat Raft cluster. - * - * The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the - * new (or re-joining) cluster member. - */ -class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap, E, EK>) : StateMachine(), Snapshottable { - companion object { - private val log = contextLogger() - } - - object Commands { - class PutAll(val entries: Map) : Command> { - override fun compaction(): Command.CompactionMode { - // The FULL compaction mode retains the command in the log until it has been stored and applied on all - // servers in the cluster. Once the commit has been applied to a state machine and closed it may be - // removed from the log during minor or major compaction. - // - // Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on - // purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes. - // TODO: Cluster membership changes need testing. - // TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to - // setup a new cluster of the desired size and transfer the data. - return Command.CompactionMode.FULL - } - } - - class Size : Query - class Get(val key: K) : Query - } - - private val map = db.transaction { createMap() } - - /** Gets a value for the given [Commands.Get.key] */ - fun get(commit: Commit>): V? { - commit.use { - val key = it.operation().key - return db.transaction { map[key]?.second } - } - } - - /** - * Stores the given [Commands.PutAll.entries] if no entry key already exists. - * - * @return map containing conflicting entries - */ - fun put(commit: Commit>): Map { - commit.use { - val index = commit.index() - val conflicts = LinkedHashMap() - db.transaction { - val entries = commit.operation().entries - log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})") - for (key in entries.keys) map[key]?.let { conflicts[key] = it.second } - if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) }) - } - return conflicts - } - } - - fun size(commit: Commit): Int { - commit.use { _ -> - return db.transaction { map.size } - } - } - - /** - * Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the - * [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a - * fixed number of recently accessed entries to ever be kept in memory. - */ - override fun snapshot(writer: SnapshotWriter) { - db.transaction { - writer.writeInt(map.size) - map.allPersisted().forEach { writer.writeObject(it.first to it.second) } - } - } - - /** Reads entries from disk and adds them to [map]. */ - override fun install(reader: SnapshotReader) { - val size = reader.readInt() - db.transaction { - map.clear() - // TODO: read & put entries in batches - for (i in 1..size) { - val (key, value) = reader.readObject>>() - map[key] = value - } - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index a9c9e2d21e..5c6f382aab 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -2,18 +2,13 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.ComponentGroupEnum -import net.corda.core.flows.FlowSession -import net.corda.core.flows.NotaryFlow -import net.corda.core.flows.TransactionParts -import net.corda.core.flows.NotarisationPayload -import net.corda.core.flows.NotarisationRequest -import net.corda.core.internal.validateRequest +import net.corda.core.flows.* +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.services.TrustedAuthorityNotaryService -import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.ContractUpgradeFilteredTransaction +import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.NotaryChangeWireTransaction -import net.corda.core.utilities.unwrap class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) { /** @@ -25,13 +20,11 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut * undo the commit of the input states (the exact mechanism still needs to be worked out). */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { - return otherSideSession.receive().unwrap { payload -> - val transaction = payload.coreTransaction - val request = NotarisationRequest(transaction.inputs, transaction.id) - validateRequest(request, payload.requestSignature) - extractParts(transaction) - } + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { + val transaction = requestPayload.coreTransaction + val request = NotarisationRequest(transaction.inputs, transaction.id) + validateRequestSignature(request, requestPayload.requestSignature) + return extractParts(transaction) } private fun extractParts(tx: CoreTransaction): TransactionParts { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 02aca42b85..466f42af0d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -1,62 +1,67 @@ package net.corda.node.services.transactions import net.corda.core.contracts.StateRef -import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails -import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef +import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX -import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY -import org.hibernate.annotations.Type -import java.io.Serializable +import net.corda.nodeapi.internal.persistence.currentDBSession +import java.time.Clock +import java.time.Instant import java.util.* import javax.annotation.concurrent.ThreadSafe import javax.persistence.* /** A RDBMS backed Uniqueness provider */ @ThreadSafe -class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() { - +class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() { @MappedSuperclass - open class PersistentUniqueness( + open class BaseComittedState( @EmbeddedId - var id: PersistentStateRef = PersistentStateRef(), + val id: PersistentStateRef, @Column(name = "consuming_transaction_id") - var consumingTxHash: String = "", - - @Column(name = "consuming_input_index", length = 36) - var consumingIndex: Int = 0, - - @Embedded - var party: PersistentParty = PersistentParty() + val consumingTxHash: String ) - @Embeddable - data class PersistentParty( - @Column(name = "requesting_party_name") - var name: String = "", + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log") + @CordaSerializable + class Request( + @Id + @GeneratedValue(strategy = GenerationType.AUTO) + val id: Int = 0, - @Column(name = "requesting_party_key", length = 255) - @Type(type = "corda-wrapper-binary") - var owningKey: ByteArray = EMPTY_BYTE_ARRAY - ) : Serializable + @Column(name = "consuming_transaction_id") + val consumingTxHash: String, + + @Embedded + @Column(name = "requesting_party_name") + var partyName: String, + + @Lob + @Column(name = "request_signature") + val requestSignature: ByteArray, + + @Column(name = "request_timestamp") + var requestDate: Instant + ) @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log") - class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty) : - PersistentUniqueness(id, consumingTxHash, consumingIndex, party) - + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") + class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash) private class InnerState { val committedStates = createMap() @@ -66,7 +71,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok companion object { private val log = contextLogger() - fun createMap(): AppendOnlyPersistentMap = + fun createMap(): AppendOnlyPersistentMap = AppendOnlyPersistentMap( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { @@ -74,46 +79,58 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") - Pair(StateRef(txhash = SecureHash.parse(txId), index = index), - UniquenessProvider.ConsumingTx( - id = SecureHash.parse(it.consumingTxHash), - inputIndex = it.consumingIndex, - requestingParty = Party( - name = CordaX500Name.parse(it.party.name), - owningKey = Crypto.decodePublicKey(it.party.owningKey)))) + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + SecureHash.parse(it.consumingTxHash) + ) + }, - toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx -> - PersistentNotaryCommit( + toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> + CommittedState( id = PersistentStateRef(txHash.toString(), index), - consumingTxHash = id.toString(), - consumingIndex = inputIndex, - party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.encoded) + consumingTxHash = id.toString() ) }, - persistentEntityClass = PersistentNotaryCommit::class.java + persistentEntityClass = CommittedState::class.java ) } - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { + logRequest(txId, callerIdentity, requestSignature) + val conflict = commitStates(states, txId) + if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict)) + } + + private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { + val request = Request( + consumingTxHash = txId.toString(), + partyName = callerIdentity.name.toString(), + requestSignature = requestSignature.serialize().bytes, + requestDate = clock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + + private fun commitStates(states: List, txId: SecureHash): Map? { val conflict = mutex.locked { - val conflictingStates = LinkedHashMap() + val conflictingStates = LinkedHashMap() for (inputState in states) { - val consumingTx = committedStates.get(inputState) + val consumingTx = committedStates[inputState] if (consumingTx != null) conflictingStates[inputState] = consumingTx } if (conflictingStates.isNotEmpty()) { log.debug("Failure, input states already committed: ${conflictingStates.keys}") - val conflict = conflictingStates.mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) } conflict } else { - states.forEachIndexed { i, stateRef -> - committedStates[stateRef] = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) + states.forEach { stateRef -> + committedStates[stateRef] = txId } log.debug("Successfully committed all input states: $states") null } } - - if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict)) + return conflict } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt new file mode 100644 index 0000000000..414992e165 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt @@ -0,0 +1,261 @@ +package net.corda.node.services.transactions + +import io.atomix.catalyst.buffer.BufferInput +import io.atomix.catalyst.buffer.BufferOutput +import io.atomix.catalyst.serializer.Serializer +import io.atomix.catalyst.serializer.TypeSerializer +import io.atomix.copycat.Command +import io.atomix.copycat.Query +import io.atomix.copycat.server.Commit +import io.atomix.copycat.server.Snapshottable +import io.atomix.copycat.server.StateMachine +import io.atomix.copycat.server.storage.snapshot.SnapshotReader +import io.atomix.copycat.server.storage.snapshot.SnapshotWriter +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger +import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded +import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef +import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.currentDBSession +import java.time.Clock + +/** + * Notarised contract state commit log, replicated across a Copycat Raft cluster. + * + * Copycat ony supports in-memory state machines, so we back the state with JDBC tables. + * State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member. + */ +class RaftTransactionCommitLog( + val db: CordaPersistence, + val nodeClock: Clock, + createMap: () -> AppendOnlyPersistentMap, E, EK> +) : StateMachine(), Snapshottable { + object Commands { + class CommitTransaction( + val states: List, + val txId: SecureHash, + val requestingParty: String, + val requestSignature: ByteArray + ) : Command> { + override fun compaction(): Command.CompactionMode { + // The FULL compaction mode retains the command in the log until it has been stored and applied on all + // servers in the cluster. Once the commit has been applied to a state machine and closed it may be + // removed from the log during minor or major compaction. + // + // Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on + // purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes. + // TODO: Cluster membership changes need testing. + // TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to + // setup a new cluster of the desired size and transfer the data. + return Command.CompactionMode.FULL + } + } + + class Get(val key: StateRef) : Query + } + + private val map = db.transaction { createMap() } + + /** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */ + fun commitTransaction(raftCommit: Commit): Map { + raftCommit.use { + val index = it.index() + val conflicts = LinkedHashMap() + db.transaction { + val commitCommand = raftCommit.command() + logRequest(commitCommand) + val states = commitCommand.states + val txId = commitCommand.txId + log.debug("State machine commit: storing entries with keys (${states.joinToString()})") + for (state in states) { + map[state]?.let { conflicts[state] = it.second } + } + if (conflicts.isEmpty()) { + val entries = states.map { it to Pair(index, txId) }.toMap() + map.putAll(entries) + } + } + return conflicts + } + } + + private fun logRequest(commitCommand: RaftTransactionCommitLog.Commands.CommitTransaction) { + val request = PersistentUniquenessProvider.Request( + consumingTxHash = commitCommand.txId.toString(), + partyName = commitCommand.requestingParty, + requestSignature = commitCommand.requestSignature, + requestDate = nodeClock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + + /** Gets the consuming transaction id for a given state reference. */ + fun get(commit: Commit): SecureHash? { + commit.use { + val key = it.operation().key + return db.transaction { map[key]?.second } + } + } + + /** + * Writes out all committed state and notarisation request entries to disk. Note that this operation does not + * load all entries into memory, as the [SnapshotWriter] is using a disk-backed buffer internally, and iterating + * map entries results in only a fixed number of recently accessed entries to ever be kept in memory. + */ + override fun snapshot(writer: SnapshotWriter) { + db.transaction { + writer.writeInt(map.size) + map.allPersisted().forEach { + val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + writer.writeUnsignedShort(bytes.size) + writer.writeObject(bytes) + } + + val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java) + criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java)) + val results = session.createQuery(criteriaQuery).resultList + + writer.writeInt(results.size) + results.forEach { + val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + writer.writeUnsignedShort(bytes.size) + writer.writeObject(bytes) + } + } + } + + /** Reads entries from disk and populates the committed state and notarisation request tables. */ + override fun install(reader: SnapshotReader) { + val size = reader.readInt() + db.transaction { + map.clear() + // TODO: read & put entries in batches + for (i in 1..size) { + val bytes = ByteArray(reader.readUnsignedShort()) + reader.read(bytes) + val (key, value) = bytes.deserialize>>() + map[key] = value + } + // Clean notarisation request log + val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java) + deleteQuery.from(PersistentUniquenessProvider.Request::class.java) + session.createQuery(deleteQuery).executeUpdate() + // Load and populate request log + for (i in 1..reader.readInt()) { + val bytes = ByteArray(reader.readUnsignedShort()) + reader.read(bytes) + val request = bytes.deserialize() + session.persist(request) + } + } + } + + companion object { + private val log = contextLogger() + + // Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: + val serializer: Serializer by lazy { + Serializer().apply { + register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) { + object : TypeSerializer { + override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction, + buffer: BufferOutput>, + serializer: Serializer) { + buffer.writeUnsignedShort(obj.states.size) + with(serializer) { + obj.states.forEach { + writeObject(it, buffer) + } + writeObject(obj.txId, buffer) + } + buffer.writeString(obj.requestingParty) + buffer.writeInt(obj.requestSignature.size) + buffer.write(obj.requestSignature) + } + + override fun read(type: Class, + buffer: BufferInput>, + serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction { + val stateCount = buffer.readUnsignedShort() + val states = (1..stateCount).map { + serializer.readObject(buffer) + } + val txId = serializer.readObject(buffer) + val name = buffer.readString() + val signatureSize = buffer.readInt() + val signature = ByteArray(signatureSize) + buffer.read(signature) + return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature) + } + } + } + register(RaftTransactionCommitLog.Commands.Get::class.java) { + object : TypeSerializer { + override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput>, serializer: Serializer) { + serializer.writeObject(obj.key, buffer) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get { + val key = serializer.readObject(buffer) + return RaftTransactionCommitLog.Commands.Get(key) + } + + } + } + register(StateRef::class.java) { + object : TypeSerializer { + override fun write(obj: StateRef, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeString(obj.encoded()) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): StateRef { + return buffer.readString().parseStateRef() + } + } + } + registerAbstract(SecureHash::class.java) { + object : TypeSerializer { + override fun write(obj: SecureHash, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeUnsignedShort(obj.bytes.size) + buffer.write(obj.bytes) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): SecureHash { + val size = buffer.readUnsignedShort() + val bytes = ByteArray(size) + buffer.read(bytes) + return SecureHash.SHA256(bytes) + } + } + } + register(LinkedHashMap::class.java) { + object : TypeSerializer> { + override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeInt(obj.size) + obj.forEach { + with(serializer) { + writeObject(it.key, buffer) + writeObject(it.value, buffer) + } + } + } + + override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer): LinkedHashMap<*, *> { + return LinkedHashMap().apply { + repeat(buffer.readInt()) { + put(serializer.readObject(buffer), serializer.readObject(buffer)) + } + } + } + } + } + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index f9da07e825..22835d8666 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -2,10 +2,6 @@ package net.corda.node.services.transactions import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry -import io.atomix.catalyst.buffer.BufferInput -import io.atomix.catalyst.buffer.BufferOutput -import io.atomix.catalyst.serializer.Serializer -import io.atomix.catalyst.serializer.TypeSerializer import io.atomix.catalyst.transport.Address import io.atomix.catalyst.transport.Transport import io.atomix.catalyst.transport.netty.NettyTransport @@ -20,14 +16,14 @@ import io.atomix.copycat.server.storage.StorageLevel import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.node.services.UniquenessProvider -import net.corda.core.serialization.SerializationDefaults +import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.node.services.config.RaftConfig @@ -36,11 +32,14 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX -import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import java.nio.file.Path +import java.time.Clock import java.util.concurrent.CompletableFuture import javax.annotation.concurrent.ThreadSafe -import javax.persistence.* +import javax.persistence.Column +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.Table /** * A uniqueness provider that records committed input states in a distributed collection replicated and @@ -51,37 +50,49 @@ import javax.persistence.* * to the cluster leader to be actioned. */ @ThreadSafe -class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() { +class RaftUniquenessProvider( + private val transportConfiguration: NodeSSLConfiguration, + private val db: CordaPersistence, + private val clock: Clock, + private val metrics: MetricRegistry, + private val raftConfig: RaftConfig +) : UniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = contextLogger() - fun createMap(): AppendOnlyPersistentMap, RaftState, String> = + fun createMap(): AppendOnlyPersistentMap, CommittedState, PersistentStateRef> = AppendOnlyPersistentMap( - toPersistentEntityKey = { it }, + toPersistentEntityKey = { PersistentStateRef(it) }, fromPersistentEntity = { - Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))) + val txId = it.id.txId + ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + Pair(it.index, SecureHash.parse(it.value) as SecureHash)) + }, - toPersistentEntity = { k: String, v: Pair -> - RaftState().apply { - key = k - value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes - index = v.first - } + toPersistentEntity = { k: StateRef, v: Pair -> + CommittedState( + PersistentStateRef(k), + v.second.toString(), + v.first) + }, - persistentEntityClass = RaftState::class.java + persistentEntityClass = CommittedState::class.java ) + + fun StateRef.encoded() = "$txhash:$index" + fun String.parseStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) } } @Entity @Table(name = "${NODE_DATABASE_PREFIX}raft_committed_states") - class RaftState( - @Id - @Column(name = "id") - var key: String = "", - - @Lob - @Column(name = "state_value") - var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "state_index") + class CommittedState( + @EmbeddedId + val id: PersistentStateRef, + @Column(name = "consuming_transaction_id") + var value: String = "", + @Column(name = "raft_log_index") var index: Long = 0 ) @@ -100,41 +111,17 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur fun start() { log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}") val stateMachineFactory = { - DistributedImmutableMap(db, RaftUniquenessProvider.Companion::createMap) + RaftTransactionCommitLog(db, clock, RaftUniquenessProvider.Companion::createMap) } val address = raftConfig.nodeAddress.let { Address(it.host, it.port) } val storage = buildStorage(storagePath) val transport = buildTransport(transportConfiguration) - val serializer = Serializer().apply { - // Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: - register(DistributedImmutableMap.Commands.PutAll::class.java) { - object : TypeSerializer> { - override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>, - buffer: BufferOutput>, - serializer: Serializer) { - writeMap(obj.entries, buffer, serializer) - } - - override fun read(type: Class>, - buffer: BufferInput>, - serializer: Serializer): DistributedImmutableMap.Commands.PutAll { - return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer)) - } - } - } - register(LinkedHashMap::class.java) { - object : TypeSerializer> { - override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput>, serializer: Serializer) = writeMap(obj, buffer, serializer) - override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer) = readMap(buffer, serializer) - } - } - } server = CopycatServer.builder(address) .withStateMachine(stateMachineFactory) .withStorage(storage) .withServerTransport(transport) - .withSerializer(serializer) + .withSerializer(RaftTransactionCommitLog.serializer) .build() val serverFuture = if (raftConfig.clusterAddresses.isNotEmpty()) { @@ -151,7 +138,7 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur val client = CopycatClient.builder(address) .withTransport(transport) // TODO: use local transport for client-server communications .withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF) - .withSerializer(serializer) + .withSerializer(RaftTransactionCommitLog.serializer) .withRecoveryStrategy(RecoveryStrategies.RECOVER) .build() _clientFuture = serverFuture.thenCompose { client.connect(address) } @@ -200,50 +187,22 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur } - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { - val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) } - + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { log.debug("Attempting to commit input states: ${states.joinToString()}") - val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries)) + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction( + states, + txId, + callerIdentity.name.toString(), + requestSignature.serialize().bytes + ) val conflicts = client.submit(commitCommand).get() - if (conflicts.isNotEmpty()) { - val conflictingStates = decode(conflicts).mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } val error = NotaryError.Conflict(txId, conflictingStates) throw NotaryInternalException(error) } log.debug("All input states of transaction $txId have been committed") } - - /** - * Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray - * here to avoid having to define additional serializers for our custom types. - */ - private fun encode(items: List>): Map { - fun StateRef.encoded() = "$txhash:$index" - return items.map { it.first.encoded() to it.second.serialize().bytes }.toMap() - } - - private fun decode(items: Map): Map { - fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) } - return items.map { it.key.toStateRef() to it.value.deserialize() }.toMap() - } } -private fun writeMap(map: Map<*, *>, buffer: BufferOutput>, serializer: Serializer) = with(map) { - buffer.writeInt(size) - forEach { - with(serializer) { - writeObject(it.key, buffer) - writeObject(it.value, buffer) - } - } -} -private fun readMap(buffer: BufferInput>, serializer: Serializer): LinkedHashMap { - return LinkedHashMap().apply { - repeat(buffer.readInt()) { - put(serializer.readObject(buffer), serializer.readObject(buffer)) - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index f9ace1e95e..65f64d5a8c 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -8,7 +8,7 @@ import java.security.PublicKey /** A simple Notary service that does not perform transaction validation */ class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index 57edcfecaf..a2550ee1de 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -5,12 +5,11 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TransactionVerificationException import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow -import net.corda.core.internal.validateRequest +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction -import net.corda.core.utilities.unwrap import java.security.SignatureException /** @@ -25,9 +24,10 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor * the transaction in question has all required signatures apart from the notary's. */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { try { - val stx = receiveTransaction() + val stx = requestPayload.signedTransaction + validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature) val notary = stx.notary checkNotary(notary) resolveAndContractVerify(stx) @@ -43,15 +43,6 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor } } - @Suspendable - private fun receiveTransaction(): SignedTransaction { - return otherSideSession.receive().unwrap { - val stx = it.signedTransaction - validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature) - stx - } - } - @Suspendable private fun resolveAndContractVerify(stx: SignedTransaction) { subFlow(ResolveTransactionsFlow(stx, otherSideSession)) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 01da0911dd..24f33c539f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -8,7 +8,7 @@ import java.security.PublicKey /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this) diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index 55aa5d06e4..4d866546bc 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -1,9 +1,12 @@ package net.corda.node.services.transactions +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.NullKeys import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 -import net.corda.core.flows.NotaryInternalException +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryInternalException import net.corda.core.identity.CordaX500Name import net.corda.node.internal.configureDatabase import net.corda.node.services.schema.NodeSchemaService @@ -19,6 +22,7 @@ import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.time.Clock import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -28,6 +32,7 @@ class PersistentUniquenessProviderTests { val testSerialization = SerializationEnvironmentRule() private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val txID = SecureHash.randomSHA256() + private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) private lateinit var database: CordaPersistence @@ -46,23 +51,25 @@ class PersistentUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { database.transaction { - val provider = PersistentUniquenessProvider() + val provider = PersistentUniquenessProvider(Clock.systemUTC()) val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity) + provider.commit(listOf(inputState), txID, identity, requestSignature) } } @Test fun `should report a conflict for a transaction with previously used inputs`() { database.transaction { - val provider = PersistentUniquenessProvider() + val provider = PersistentUniquenessProvider(Clock.systemUTC()) val inputState = generateStateRef() val inputs = listOf(inputState) - provider.commit(inputs, txID, identity) + provider.commit(inputs, txID, identity, requestSignature) - val ex = assertFailsWith { provider.commit(inputs, txID, identity) } + val ex = assertFailsWith { + provider.commit(inputs, txID, identity, requestSignature) + } val error = ex.error as NotaryError.Conflict val conflictCause = error.consumedStates[inputState]!! diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt similarity index 65% rename from node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt rename to node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt index d8a7611cb1..e00d5b0c57 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt @@ -6,6 +6,8 @@ import io.atomix.copycat.client.CopycatClient import io.atomix.copycat.server.CopycatServer import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash import net.corda.core.internal.concurrent.asCordaFuture import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.NetworkHostAndPort @@ -14,20 +16,22 @@ import net.corda.node.internal.configureDatabase import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.internal.LogHelper +import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.freeLocalHostAndPort -import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.internal.LogHelper import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.time.Clock import java.util.concurrent.CompletableFuture import kotlin.test.assertEquals import kotlin.test.assertTrue -class DistributedImmutableMapTests { +class RaftTransactionCommitLogTests { data class Member(val client: CopycatClient, val server: CopycatServer) @Rule @@ -40,12 +44,13 @@ class DistributedImmutableMapTests { @Before fun setup() { LogHelper.setLevel("-org.apache.activemq") + LogHelper.setLevel("+io.atomix") cluster = setUpCluster() } @After fun tearDown() { - LogHelper.reset("org.apache.activemq") + LogHelper.reset("org.apache.activemq", "io.atomix") cluster.map { it.client.close().asCordaFuture() }.transpose().getOrThrow() cluster.map { it.server.shutdown().asCordaFuture() }.transpose().getOrThrow() databases.forEach { it.close() } @@ -55,28 +60,38 @@ class DistributedImmutableMapTests { fun `stores entries correctly`() { val client = cluster.last().client - val entries = mapOf("key1" to "value1", "key2" to "value2") + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) - val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) + val conflict = client.submit(commitCommand).getOrThrow() assertTrue { conflict.isEmpty() } - val value1 = client.submit(DistributedImmutableMap.Commands.Get("key1")) - val value2 = client.submit(DistributedImmutableMap.Commands.Get("key2")) + val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0])) + val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1])) - assertEquals(value1.getOrThrow(), "value1") - assertEquals(value2.getOrThrow(), "value2") + assertEquals(value1.getOrThrow(), txId) + assertEquals(value2.getOrThrow(), txId) } @Test fun `returns conflict for duplicate entries`() { val client = cluster.last().client - val entries = mapOf("key1" to "value1", "key2" to "value2") + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) - var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) + var conflict = client.submit(commitCommand).getOrThrow() assertTrue { conflict.isEmpty() } - conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() - assertTrue { conflict == entries } + + conflict = client.submit(commitCommand).getOrThrow() + assertEquals(conflict.keys, states.toSet()) + conflict.forEach { assertEquals(it.value, txId) } } private fun setUpCluster(nodeCount: Int = 3): List { @@ -91,11 +106,12 @@ class DistributedImmutableMapTests { val address = Address(myAddress.host, myAddress.port) val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(serverNameTablePrefix = "PORT_${myAddress.port}_"), rigorousMock(), NodeSchemaService(includeNotarySchemas = true)) databases.add(database) - val stateMachineFactory = { DistributedImmutableMap(database, RaftUniquenessProvider.Companion::createMap) } + val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) } val server = CopycatServer.builder(address) .withStateMachine(stateMachineFactory) .withStorage(storage) + .withSerializer(RaftTransactionCommitLog.serializer) .build() val serverInitFuture = if (clusterAddress != null) { @@ -107,6 +123,7 @@ class DistributedImmutableMapTests { val client = CopycatClient.builder(address) .withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF) + .withSerializer(RaftTransactionCommitLog.serializer) .build() return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) } } diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index 18f2c2c6be..fc58760a5c 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -5,14 +5,13 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TransactionVerificationException import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow -import net.corda.core.internal.validateRequest +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction -import net.corda.core.utilities.unwrap import net.corda.node.services.transactions.PersistentUniquenessProvider import java.security.PublicKey import java.security.SignatureException @@ -26,7 +25,7 @@ import java.security.SignatureException // START 1 @CordaService class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this) @@ -43,9 +42,10 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating * transaction dependency chain. */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { try { - val stx = receiveTransaction() + val stx = requestPayload.signedTransaction + validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature) val notary = stx.notary checkNotary(notary) verifySignatures(stx) @@ -61,15 +61,6 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating } } - @Suspendable - private fun receiveTransaction(): SignedTransaction { - return otherSideSession.receive().unwrap { - val stx = it.signedTransaction - validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature) - stx - } - } - @Suspendable private fun resolveAndContractVerify(stx: SignedTransaction) { subFlow(ResolveTransactionsFlow(stx, otherSideSession)) From 64871ac0246716e2b90fe1a1809108825d57c45c Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Thu, 15 Mar 2018 14:40:22 +0000 Subject: [PATCH 3/3] Fix `IRSSimulationTest` test (#2821) * Fix `IRSSimulationTest` test Since prod class `IRSSimulation` loading `trade.json` as resource, `trade.json` should be located in prod resources folder of a dependant project to be successfully loaded. Also improve error reporting. * Change of Json trades samples locations --- .../test/kotlin/net/corda/irs/web/demo/IrsDemoClientApi.kt | 2 +- .../main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt | 4 +++- .../main}/resources/net/corda/irs/web/simulation/trade.json | 0 3 files changed, 4 insertions(+), 2 deletions(-) rename samples/{irs-demo/web/src/test => network-visualiser/src/main}/resources/net/corda/irs/web/simulation/trade.json (100%) diff --git a/samples/irs-demo/web/src/test/kotlin/net/corda/irs/web/demo/IrsDemoClientApi.kt b/samples/irs-demo/web/src/test/kotlin/net/corda/irs/web/demo/IrsDemoClientApi.kt index 9bebc4481c..eb51e47470 100644 --- a/samples/irs-demo/web/src/test/kotlin/net/corda/irs/web/demo/IrsDemoClientApi.kt +++ b/samples/irs-demo/web/src/test/kotlin/net/corda/irs/web/demo/IrsDemoClientApi.kt @@ -12,7 +12,7 @@ class IRSDemoClientApi(hostAndPort: NetworkHostAndPort) { private val api = HttpApi.fromHostAndPort(hostAndPort, apiRoot) fun runTrade(tradeId: String, oracleName: CordaX500Name) { - val fileContents = IOUtils.toString(javaClass.classLoader.getResourceAsStream("net/corda/irs/simulation/example-irs-trade.json"), Charsets.UTF_8.name()) + val fileContents = IOUtils.toString(javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/example-irs-trade.json"), Charsets.UTF_8.name()) val tradeFile = fileContents.replace("tradeXXX", tradeId).replace("oracleXXX", oracleName.toString()) api.postJson("deals", tradeFile) } diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt index 1ad0942057..aeab972aca 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/IRSSimulation.kt @@ -129,7 +129,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten // have the convenient copy() method that'd let us make small adjustments. Instead they're partly mutable. // TODO: We should revisit this in post-Excalibur cleanup and fix, e.g. by introducing an interface. - val irs = om.readValue(javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/trade.json") + val resourceAsStream = javaClass.classLoader.getResourceAsStream("net/corda/irs/web/simulation/trade.json") + ?: error("Trade representation cannot be loaded.") + val irs = om.readValue(resourceAsStream .reader() .readText() .replace("oracleXXX", RatesOracleNode.RATES_SERVICE_NAME.toString())) diff --git a/samples/irs-demo/web/src/test/resources/net/corda/irs/web/simulation/trade.json b/samples/network-visualiser/src/main/resources/net/corda/irs/web/simulation/trade.json similarity index 100% rename from samples/irs-demo/web/src/test/resources/net/corda/irs/web/simulation/trade.json rename to samples/network-visualiser/src/main/resources/net/corda/irs/web/simulation/trade.json