diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 8a30c60338..bf524bd54a 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1460,7 +1460,7 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net @co.paralleluniverse.fibers.Suspendable protected final void checkNotary(net.corda.core.identity.Party) @org.jetbrains.annotations.NotNull public final net.corda.core.flows.FlowSession getOtherSideSession() @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService() - @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.TransactionParts receiveAndVerifyTx() + @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload) ## public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic public (net.corda.core.flows.FlowSession) @@ -2066,7 +2066,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l ## @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService public () - public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) + public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) @org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog() @org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider() @@ -2082,7 +2082,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError() ## public interface net.corda.core.node.services.UniquenessProvider - public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) + public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object public (Map) diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index 2c3ef48413..2fd9dbf4b0 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -127,7 +127,7 @@ class NotaryFlow { * It checks that the time-window command is valid (if present) and commits the input state, or returns a conflict * if any of the input states have been previously committed. * - * Additional transaction validation logic can be added when implementing [receiveAndVerifyTx]. + * Additional transaction validation logic can be added when implementing [validateRequest]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic() { @@ -137,13 +137,15 @@ class NotaryFlow { check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) { "We are not a notary on the network" } + + val requestPayload = otherSideSession.receive().unwrap { it } var txId: SecureHash? = null try { - val parts = receiveAndVerifyTx() + val parts = validateRequest(requestPayload) txId = parts.id checkNotary(parts.notary) service.validateTimeWindow(parts.timestamp) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty) + service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature) signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) @@ -152,11 +154,10 @@ class NotaryFlow { } /** - * Implement custom logic to receive the transaction to notarise, and perform verification based on validity and - * privacy requirements. + * Implement custom logic to perform transaction verification based on validity and privacy requirements. */ @Suspendable - abstract fun receiveAndVerifyTx(): TransactionParts + protected abstract fun validateRequest(requestPayload: NotarisationPayload): TransactionParts /** Check if transaction is intended to be signed by this notary. */ @Suspendable diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index cfde75e90a..73e6d7819d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -397,10 +397,9 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade } /** Verifies that the correct notarisation request was signed by the counterparty. */ -fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) { +fun NotaryFlow.Service.validateRequestSignature(request: NotarisationRequest, signature: NotarisationRequestSignature) { val requestingParty = otherSideSession.counterparty request.verifySignature(signature, requestingParty) - // TODO: persist the signature for traceability. Do we need to persist the request as well? } /** Creates a signature over the notarisation request using the legal identity key. */ diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt index 0cb1e9fcf6..c7aa072214 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -79,9 +79,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that * this method does not throw an exception when input states are present multiple times within the transaction. */ - fun commitInputStates(inputs: List, txId: SecureHash, caller: Party) { + fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) { try { - uniquenessProvider.commit(inputs, txId, caller) + uniquenessProvider.commit(inputs, txId, caller, requestSignature) } catch (e: NotaryInternalException) { if (e.error is NotaryError.Conflict) { val conflicts = inputs.filterIndexed { _, stateRef -> diff --git a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt index 2873d22d74..5a2950d0d4 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt @@ -3,6 +3,7 @@ package net.corda.core.node.services import net.corda.core.CordaException import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable @@ -14,11 +15,12 @@ import net.corda.core.serialization.CordaSerializable */ interface UniquenessProvider { /** Commits all input states of the given transaction. */ - fun commit(states: List, txId: SecureHash, callerIdentity: Party) + fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) /** Specifies the consuming transaction for every conflicting state. */ @CordaSerializable @Deprecated("No longer used due to potential privacy leak") + @Suppress("DEPRECATION") data class Conflict(val stateHistory: Map) /** @@ -26,6 +28,7 @@ interface UniquenessProvider { * the caller identity requesting the commit. */ @CordaSerializable + @Deprecated("No longer used") data class ConsumingTx(val id: SecureHash, val inputIndex: Int, val requestingParty: Party) } diff --git a/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt new file mode 100644 index 0000000000..b1cd4e7d4f --- /dev/null +++ b/core/src/test/kotlin/net/corda/core/serialization/NotaryExceptionSerializationTest.kt @@ -0,0 +1,31 @@ +package net.corda.core.serialization + +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryException +import net.corda.core.flows.StateConsumptionDetails +import net.corda.testing.core.SerializationEnvironmentRule +import org.junit.Rule +import org.junit.Test +import kotlin.test.assertEquals + +class NotaryExceptionSerializationTest { + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + @Test + fun testSerializationRoundTrip() { + val txhash = SecureHash.randomSHA256() + val stateHistory: Map = mapOf( + StateRef(txhash, 0) to StateConsumptionDetails(txhash.sha256()) + ) + val error = NotaryError.Conflict(txhash, stateHistory) + val instance = NotaryException(error) + val instanceOnTheOtherSide = instance.serialize().bytes.deserialize() + + assertEquals(instance.error, instanceOnTheOtherSide.error) + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt deleted file mode 100644 index f1ac6956a3..0000000000 --- a/core/src/test/kotlin/net/corda/core/serialization/UniquenessExceptionSerializationTest.kt +++ /dev/null @@ -1,33 +0,0 @@ -package net.corda.core.serialization - -import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.generateKeyPair -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.node.services.UniquenessException -import net.corda.core.node.services.UniquenessProvider -import net.corda.testing.core.SerializationEnvironmentRule -import org.junit.Rule -import org.junit.Test -import kotlin.test.assertEquals - -class UniquenessExceptionSerializationTest { - @Rule - @JvmField - val testSerialization = SerializationEnvironmentRule() - - @Test - fun testSerializationRoundTrip() { - val txhash = SecureHash.randomSHA256() - val txHash2 = SecureHash.randomSHA256() - val dummyParty = Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public) - val stateHistory: Map = mapOf(StateRef(txhash, 0) to UniquenessProvider.ConsumingTx(txHash2, 1, dummyParty)) - val conflict = UniquenessProvider.Conflict(stateHistory) - val instance = UniquenessException(conflict) - - val instanceOnTheOtherSide = instance.serialize().deserialize() - - assertEquals(instance.error, instanceOnTheOtherSide.error) - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 4d761a6ded..cca607030d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -673,7 +673,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service") return notaryConfig.run { if (raft != null) { - val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.monitoringService.metrics, raft) + val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.clock, services.monitoringService.metrics, raft) (if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider) } else if (bftSMaRt != null) { if (validating) throw IllegalArgumentException("Validating BFTSMaRt notary not supported") diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 97bdb9518f..d874e838e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -55,10 +55,11 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot object NodeNotary object NodeNotaryV1 : MappedSchema(schemaFamily = NodeNotary.javaClass, version = 1, - mappedTypes = listOf(PersistentUniquenessProvider.PersistentUniqueness::class.java, - PersistentUniquenessProvider.PersistentNotaryCommit::class.java, - RaftUniquenessProvider.RaftState::class.java, - BFTNonValidatingNotaryService.PersistedCommittedState::class.java + mappedTypes = listOf(PersistentUniquenessProvider.BaseComittedState::class.java, + PersistentUniquenessProvider.Request::class.java, + PersistentUniquenessProvider.CommittedState::class.java, + RaftUniquenessProvider.CommittedState::class.java, + BFTNonValidatingNotaryService.CommittedState::class.java )) // Required schemas are those used by internal Corda services diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 4602088edd..9862e47807 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -3,14 +3,11 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.SettableFuture import net.corda.core.contracts.StateRef -import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData import net.corda.core.flows.* -import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.NotaryService -import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -99,58 +96,52 @@ class BFTNonValidatingNotaryService( @Entity @Table(name = "${NODE_DATABASE_PREFIX}bft_committed_states") - class PersistedCommittedState(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentUniquenessProvider.PersistentParty) - : PersistentUniquenessProvider.PersistentUniqueness(id, consumingTxHash, consumingIndex, party) + class CommittedState(id: PersistentStateRef, consumingTxHash: String) : PersistentUniquenessProvider.BaseComittedState(id, consumingTxHash) - private fun createMap(): AppendOnlyPersistentMap { + private fun createMap(): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val txId = it.id.txId + ?: throw IllegalStateException("DB returned null SecureHash transactionId") val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") - Pair(StateRef(txhash = SecureHash.parse(txId), index = index), - UniquenessProvider.ConsumingTx( - id = SecureHash.parse(it.consumingTxHash), - inputIndex = it.consumingIndex, - requestingParty = Party( - name = CordaX500Name.parse(it.party.name), - owningKey = Crypto.decodePublicKey(it.party.owningKey)))) - }, - toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx -> - PersistedCommittedState( - id = PersistentStateRef(txHash.toString(), index), - consumingTxHash = id.toString(), - consumingIndex = inputIndex, - party = PersistentUniquenessProvider.PersistentParty(requestingParty.name.toString(), - requestingParty.owningKey.encoded) + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + SecureHash.parse(it.consumingTxHash) ) }, - persistentEntityClass = PersistedCommittedState::class.java + toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> + CommittedState( + id = PersistentStateRef(txHash.toString(), index), + consumingTxHash = id.toString() + ) + }, + persistentEntityClass = CommittedState::class.java ) } private class Replica(config: BFTSMaRtConfig, replicaId: Int, - createMap: () -> AppendOnlyPersistentMap, + createMap: () -> AppendOnlyPersistentMap, services: ServiceHubInternal, notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) { override fun executeCommand(command: ByteArray): ByteArray { val commitRequest = command.deserialize() verifyRequest(commitRequest) - val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity) + val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature) return response.serialize().bytes } - private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party): BFTSMaRt.ReplicaResponse { + private fun verifyAndCommitTx(transaction: CoreTransaction, callerIdentity: Party, requestSignature: NotarisationRequestSignature): BFTSMaRt.ReplicaResponse { return try { val id = transaction.id val inputs = transaction.inputs val notary = transaction.notary if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow) if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary) - commitInputStates(inputs, id, callerIdentity) + commitInputStates(inputs, id, callerIdentity.name, requestSignature) log.debug { "Inputs committed successfully, signing $id" } BFTSMaRt.ReplicaResponse.Signature(sign(id)) } catch (e: NotaryInternalException) { @@ -166,7 +157,6 @@ class BFTNonValidatingNotaryService( val transaction = commitRequest.payload.coreTransaction val notarisationRequest = NotarisationRequest(transaction.inputs, transaction.id) notarisationRequest.verifySignature(commitRequest.payload.requestSignature, commitRequest.callerIdentity) - // TODO: persist the signature for traceability. } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 24e3de6089..225dbc4e76 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -15,10 +15,10 @@ import bftsmart.tom.util.Extractor import net.corda.core.contracts.StateRef import net.corda.core.crypto.* import net.corda.core.flows.* +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.declaredField import net.corda.core.internal.toTypedArray -import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -30,6 +30,7 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Replica import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.currentDBSession import java.nio.file.Path import java.security.PublicKey import java.util.* @@ -169,8 +170,8 @@ object BFTSMaRt { */ abstract class Replica(config: BFTSMaRtConfig, replicaId: Int, - createMap: () -> AppendOnlyPersistentMap, + createMap: () -> AppendOnlyPersistentMap, protected val services: ServiceHubInternal, protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() { companion object { @@ -215,28 +216,40 @@ object BFTSMaRt { */ abstract fun executeCommand(command: ByteArray): ByteArray? - protected fun commitInputStates(states: List, txId: SecureHash, callerIdentity: Party) { + protected fun commitInputStates(states: List, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) { log.debug { "Attempting to commit inputs for transaction: $txId" } - val conflicts = mutableMapOf() + + val conflicts = mutableMapOf() services.database.transaction { + logRequest(txId, callerName, requestSignature) states.forEach { state -> commitLog[state]?.let { conflicts[state] = it } } if (conflicts.isEmpty()) { log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } - states.forEachIndexed { i, stateRef -> - val txInfo = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) - commitLog[stateRef] = txInfo + states.forEach { stateRef -> + commitLog[stateRef] = txId } } else { log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" } - val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } val error = NotaryError.Conflict(txId, conflict) throw NotaryInternalException(error) } } } + private fun logRequest(txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) { + val request = PersistentUniquenessProvider.Request( + consumingTxHash = txId.toString(), + partyName = callerName.toString(), + requestSignature = requestSignature.serialize().bytes, + requestDate = services.clock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + /** Generates a signature over an arbitrary array of bytes. */ protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) } @@ -253,18 +266,25 @@ object BFTSMaRt { // - Add streaming to support large data sets. override fun getSnapshot(): ByteArray { // LinkedHashMap for deterministic serialisation - val m = LinkedHashMap() - services.database.transaction { - commitLog.allPersisted().forEach { m[it.first] = it.second } + val committedStates = LinkedHashMap() + val requests = services.database.transaction { + commitLog.allPersisted().forEach { committedStates[it.first] = it.second } + val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java) + criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java)) + session.createQuery(criteriaQuery).resultList } - return m.serialize().bytes + return (committedStates to requests).serialize().bytes } override fun installSnapshot(bytes: ByteArray) { - val m = bytes.deserialize>() + val (committedStates, requests) = bytes.deserialize, List>>() services.database.transaction { commitLog.clear() - commitLog.putAll(m) + commitLog.putAll(committedStates) + val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java) + deleteQuery.from(PersistentUniquenessProvider.Request::class.java) + session.createQuery(deleteQuery).executeUpdate() + requests.forEach { session.persist(it) } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt deleted file mode 100644 index 4cb2082fb4..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt +++ /dev/null @@ -1,106 +0,0 @@ -package net.corda.node.services.transactions - -import io.atomix.copycat.Command -import io.atomix.copycat.Query -import io.atomix.copycat.server.Commit -import io.atomix.copycat.server.Snapshottable -import io.atomix.copycat.server.StateMachine -import io.atomix.copycat.server.storage.snapshot.SnapshotReader -import io.atomix.copycat.server.storage.snapshot.SnapshotWriter -import net.corda.core.utilities.contextLogger -import net.corda.node.utilities.AppendOnlyPersistentMap -import net.corda.nodeapi.internal.persistence.CordaPersistence -import java.util.* - -/** - * A distributed map state machine that doesn't allow overriding values. The state machine is replicated - * across a Copycat Raft cluster. - * - * The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the - * new (or re-joining) cluster member. - */ -class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap, E, EK>) : StateMachine(), Snapshottable { - companion object { - private val log = contextLogger() - } - - object Commands { - class PutAll(val entries: Map) : Command> { - override fun compaction(): Command.CompactionMode { - // The FULL compaction mode retains the command in the log until it has been stored and applied on all - // servers in the cluster. Once the commit has been applied to a state machine and closed it may be - // removed from the log during minor or major compaction. - // - // Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on - // purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes. - // TODO: Cluster membership changes need testing. - // TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to - // setup a new cluster of the desired size and transfer the data. - return Command.CompactionMode.FULL - } - } - - class Size : Query - class Get(val key: K) : Query - } - - private val map = db.transaction { createMap() } - - /** Gets a value for the given [Commands.Get.key] */ - fun get(commit: Commit>): V? { - commit.use { - val key = it.operation().key - return db.transaction { map[key]?.second } - } - } - - /** - * Stores the given [Commands.PutAll.entries] if no entry key already exists. - * - * @return map containing conflicting entries - */ - fun put(commit: Commit>): Map { - commit.use { - val index = commit.index() - val conflicts = LinkedHashMap() - db.transaction { - val entries = commit.operation().entries - log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})") - for (key in entries.keys) map[key]?.let { conflicts[key] = it.second } - if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) }) - } - return conflicts - } - } - - fun size(commit: Commit): Int { - commit.use { _ -> - return db.transaction { map.size } - } - } - - /** - * Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the - * [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a - * fixed number of recently accessed entries to ever be kept in memory. - */ - override fun snapshot(writer: SnapshotWriter) { - db.transaction { - writer.writeInt(map.size) - map.allPersisted().forEach { writer.writeObject(it.first to it.second) } - } - } - - /** Reads entries from disk and adds them to [map]. */ - override fun install(reader: SnapshotReader) { - val size = reader.readInt() - db.transaction { - map.clear() - // TODO: read & put entries in batches - for (i in 1..size) { - val (key, value) = reader.readObject>>() - map[key] = value - } - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index a9c9e2d21e..5c6f382aab 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -2,18 +2,13 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.ComponentGroupEnum -import net.corda.core.flows.FlowSession -import net.corda.core.flows.NotaryFlow -import net.corda.core.flows.TransactionParts -import net.corda.core.flows.NotarisationPayload -import net.corda.core.flows.NotarisationRequest -import net.corda.core.internal.validateRequest +import net.corda.core.flows.* +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.services.TrustedAuthorityNotaryService -import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.ContractUpgradeFilteredTransaction +import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.NotaryChangeWireTransaction -import net.corda.core.utilities.unwrap class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) { /** @@ -25,13 +20,11 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut * undo the commit of the input states (the exact mechanism still needs to be worked out). */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { - return otherSideSession.receive().unwrap { payload -> - val transaction = payload.coreTransaction - val request = NotarisationRequest(transaction.inputs, transaction.id) - validateRequest(request, payload.requestSignature) - extractParts(transaction) - } + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { + val transaction = requestPayload.coreTransaction + val request = NotarisationRequest(transaction.inputs, transaction.id) + validateRequestSignature(request, requestPayload.requestSignature) + return extractParts(transaction) } private fun extractParts(tx: CoreTransaction): TransactionParts { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 02aca42b85..466f42af0d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -1,62 +1,67 @@ package net.corda.node.services.transactions import net.corda.core.contracts.StateRef -import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails -import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef +import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX -import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY -import org.hibernate.annotations.Type -import java.io.Serializable +import net.corda.nodeapi.internal.persistence.currentDBSession +import java.time.Clock +import java.time.Instant import java.util.* import javax.annotation.concurrent.ThreadSafe import javax.persistence.* /** A RDBMS backed Uniqueness provider */ @ThreadSafe -class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() { - +class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() { @MappedSuperclass - open class PersistentUniqueness( + open class BaseComittedState( @EmbeddedId - var id: PersistentStateRef = PersistentStateRef(), + val id: PersistentStateRef, @Column(name = "consuming_transaction_id") - var consumingTxHash: String = "", - - @Column(name = "consuming_input_index", length = 36) - var consumingIndex: Int = 0, - - @Embedded - var party: PersistentParty = PersistentParty() + val consumingTxHash: String ) - @Embeddable - data class PersistentParty( - @Column(name = "requesting_party_name") - var name: String = "", + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_request_log") + @CordaSerializable + class Request( + @Id + @GeneratedValue(strategy = GenerationType.AUTO) + val id: Int = 0, - @Column(name = "requesting_party_key", length = 255) - @Type(type = "corda-wrapper-binary") - var owningKey: ByteArray = EMPTY_BYTE_ARRAY - ) : Serializable + @Column(name = "consuming_transaction_id") + val consumingTxHash: String, + + @Embedded + @Column(name = "requesting_party_name") + var partyName: String, + + @Lob + @Column(name = "request_signature") + val requestSignature: ByteArray, + + @Column(name = "request_timestamp") + var requestDate: Instant + ) @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log") - class PersistentNotaryCommit(id: PersistentStateRef, consumingTxHash: String, consumingIndex: Int, party: PersistentParty) : - PersistentUniqueness(id, consumingTxHash, consumingIndex, party) - + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") + class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash) private class InnerState { val committedStates = createMap() @@ -66,7 +71,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok companion object { private val log = contextLogger() - fun createMap(): AppendOnlyPersistentMap = + fun createMap(): AppendOnlyPersistentMap = AppendOnlyPersistentMap( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { @@ -74,46 +79,58 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok val txId = it.id.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") - Pair(StateRef(txhash = SecureHash.parse(txId), index = index), - UniquenessProvider.ConsumingTx( - id = SecureHash.parse(it.consumingTxHash), - inputIndex = it.consumingIndex, - requestingParty = Party( - name = CordaX500Name.parse(it.party.name), - owningKey = Crypto.decodePublicKey(it.party.owningKey)))) + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + SecureHash.parse(it.consumingTxHash) + ) + }, - toPersistentEntity = { (txHash, index): StateRef, (id, inputIndex, requestingParty): UniquenessProvider.ConsumingTx -> - PersistentNotaryCommit( + toPersistentEntity = { (txHash, index): StateRef, id: SecureHash -> + CommittedState( id = PersistentStateRef(txHash.toString(), index), - consumingTxHash = id.toString(), - consumingIndex = inputIndex, - party = PersistentParty(requestingParty.name.toString(), requestingParty.owningKey.encoded) + consumingTxHash = id.toString() ) }, - persistentEntityClass = PersistentNotaryCommit::class.java + persistentEntityClass = CommittedState::class.java ) } - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { + logRequest(txId, callerIdentity, requestSignature) + val conflict = commitStates(states, txId) + if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict)) + } + + private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { + val request = Request( + consumingTxHash = txId.toString(), + partyName = callerIdentity.name.toString(), + requestSignature = requestSignature.serialize().bytes, + requestDate = clock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + + private fun commitStates(states: List, txId: SecureHash): Map? { val conflict = mutex.locked { - val conflictingStates = LinkedHashMap() + val conflictingStates = LinkedHashMap() for (inputState in states) { - val consumingTx = committedStates.get(inputState) + val consumingTx = committedStates[inputState] if (consumingTx != null) conflictingStates[inputState] = consumingTx } if (conflictingStates.isNotEmpty()) { log.debug("Failure, input states already committed: ${conflictingStates.keys}") - val conflict = conflictingStates.mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) } conflict } else { - states.forEachIndexed { i, stateRef -> - committedStates[stateRef] = UniquenessProvider.ConsumingTx(txId, i, callerIdentity) + states.forEach { stateRef -> + committedStates[stateRef] = txId } log.debug("Successfully committed all input states: $states") null } } - - if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict)) + return conflict } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt new file mode 100644 index 0000000000..414992e165 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt @@ -0,0 +1,261 @@ +package net.corda.node.services.transactions + +import io.atomix.catalyst.buffer.BufferInput +import io.atomix.catalyst.buffer.BufferOutput +import io.atomix.catalyst.serializer.Serializer +import io.atomix.catalyst.serializer.TypeSerializer +import io.atomix.copycat.Command +import io.atomix.copycat.Query +import io.atomix.copycat.server.Commit +import io.atomix.copycat.server.Snapshottable +import io.atomix.copycat.server.StateMachine +import io.atomix.copycat.server.storage.snapshot.SnapshotReader +import io.atomix.copycat.server.storage.snapshot.SnapshotWriter +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger +import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded +import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef +import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.currentDBSession +import java.time.Clock + +/** + * Notarised contract state commit log, replicated across a Copycat Raft cluster. + * + * Copycat ony supports in-memory state machines, so we back the state with JDBC tables. + * State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member. + */ +class RaftTransactionCommitLog( + val db: CordaPersistence, + val nodeClock: Clock, + createMap: () -> AppendOnlyPersistentMap, E, EK> +) : StateMachine(), Snapshottable { + object Commands { + class CommitTransaction( + val states: List, + val txId: SecureHash, + val requestingParty: String, + val requestSignature: ByteArray + ) : Command> { + override fun compaction(): Command.CompactionMode { + // The FULL compaction mode retains the command in the log until it has been stored and applied on all + // servers in the cluster. Once the commit has been applied to a state machine and closed it may be + // removed from the log during minor or major compaction. + // + // Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on + // purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes. + // TODO: Cluster membership changes need testing. + // TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to + // setup a new cluster of the desired size and transfer the data. + return Command.CompactionMode.FULL + } + } + + class Get(val key: StateRef) : Query + } + + private val map = db.transaction { createMap() } + + /** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */ + fun commitTransaction(raftCommit: Commit): Map { + raftCommit.use { + val index = it.index() + val conflicts = LinkedHashMap() + db.transaction { + val commitCommand = raftCommit.command() + logRequest(commitCommand) + val states = commitCommand.states + val txId = commitCommand.txId + log.debug("State machine commit: storing entries with keys (${states.joinToString()})") + for (state in states) { + map[state]?.let { conflicts[state] = it.second } + } + if (conflicts.isEmpty()) { + val entries = states.map { it to Pair(index, txId) }.toMap() + map.putAll(entries) + } + } + return conflicts + } + } + + private fun logRequest(commitCommand: RaftTransactionCommitLog.Commands.CommitTransaction) { + val request = PersistentUniquenessProvider.Request( + consumingTxHash = commitCommand.txId.toString(), + partyName = commitCommand.requestingParty, + requestSignature = commitCommand.requestSignature, + requestDate = nodeClock.instant() + ) + val session = currentDBSession() + session.persist(request) + } + + /** Gets the consuming transaction id for a given state reference. */ + fun get(commit: Commit): SecureHash? { + commit.use { + val key = it.operation().key + return db.transaction { map[key]?.second } + } + } + + /** + * Writes out all committed state and notarisation request entries to disk. Note that this operation does not + * load all entries into memory, as the [SnapshotWriter] is using a disk-backed buffer internally, and iterating + * map entries results in only a fixed number of recently accessed entries to ever be kept in memory. + */ + override fun snapshot(writer: SnapshotWriter) { + db.transaction { + writer.writeInt(map.size) + map.allPersisted().forEach { + val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + writer.writeUnsignedShort(bytes.size) + writer.writeObject(bytes) + } + + val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java) + criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java)) + val results = session.createQuery(criteriaQuery).resultList + + writer.writeInt(results.size) + results.forEach { + val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + writer.writeUnsignedShort(bytes.size) + writer.writeObject(bytes) + } + } + } + + /** Reads entries from disk and populates the committed state and notarisation request tables. */ + override fun install(reader: SnapshotReader) { + val size = reader.readInt() + db.transaction { + map.clear() + // TODO: read & put entries in batches + for (i in 1..size) { + val bytes = ByteArray(reader.readUnsignedShort()) + reader.read(bytes) + val (key, value) = bytes.deserialize>>() + map[key] = value + } + // Clean notarisation request log + val deleteQuery = session.criteriaBuilder.createCriteriaDelete(PersistentUniquenessProvider.Request::class.java) + deleteQuery.from(PersistentUniquenessProvider.Request::class.java) + session.createQuery(deleteQuery).executeUpdate() + // Load and populate request log + for (i in 1..reader.readInt()) { + val bytes = ByteArray(reader.readUnsignedShort()) + reader.read(bytes) + val request = bytes.deserialize() + session.persist(request) + } + } + } + + companion object { + private val log = contextLogger() + + // Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: + val serializer: Serializer by lazy { + Serializer().apply { + register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) { + object : TypeSerializer { + override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction, + buffer: BufferOutput>, + serializer: Serializer) { + buffer.writeUnsignedShort(obj.states.size) + with(serializer) { + obj.states.forEach { + writeObject(it, buffer) + } + writeObject(obj.txId, buffer) + } + buffer.writeString(obj.requestingParty) + buffer.writeInt(obj.requestSignature.size) + buffer.write(obj.requestSignature) + } + + override fun read(type: Class, + buffer: BufferInput>, + serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction { + val stateCount = buffer.readUnsignedShort() + val states = (1..stateCount).map { + serializer.readObject(buffer) + } + val txId = serializer.readObject(buffer) + val name = buffer.readString() + val signatureSize = buffer.readInt() + val signature = ByteArray(signatureSize) + buffer.read(signature) + return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature) + } + } + } + register(RaftTransactionCommitLog.Commands.Get::class.java) { + object : TypeSerializer { + override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput>, serializer: Serializer) { + serializer.writeObject(obj.key, buffer) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get { + val key = serializer.readObject(buffer) + return RaftTransactionCommitLog.Commands.Get(key) + } + + } + } + register(StateRef::class.java) { + object : TypeSerializer { + override fun write(obj: StateRef, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeString(obj.encoded()) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): StateRef { + return buffer.readString().parseStateRef() + } + } + } + registerAbstract(SecureHash::class.java) { + object : TypeSerializer { + override fun write(obj: SecureHash, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeUnsignedShort(obj.bytes.size) + buffer.write(obj.bytes) + } + + override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): SecureHash { + val size = buffer.readUnsignedShort() + val bytes = ByteArray(size) + buffer.read(bytes) + return SecureHash.SHA256(bytes) + } + } + } + register(LinkedHashMap::class.java) { + object : TypeSerializer> { + override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput>, serializer: Serializer) { + buffer.writeInt(obj.size) + obj.forEach { + with(serializer) { + writeObject(it.key, buffer) + writeObject(it.value, buffer) + } + } + } + + override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer): LinkedHashMap<*, *> { + return LinkedHashMap().apply { + repeat(buffer.readInt()) { + put(serializer.readObject(buffer), serializer.readObject(buffer)) + } + } + } + } + } + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index f9da07e825..22835d8666 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -2,10 +2,6 @@ package net.corda.node.services.transactions import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry -import io.atomix.catalyst.buffer.BufferInput -import io.atomix.catalyst.buffer.BufferOutput -import io.atomix.catalyst.serializer.Serializer -import io.atomix.catalyst.serializer.TypeSerializer import io.atomix.catalyst.transport.Address import io.atomix.catalyst.transport.Transport import io.atomix.catalyst.transport.netty.NettyTransport @@ -20,14 +16,14 @@ import io.atomix.copycat.server.storage.StorageLevel import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.node.services.UniquenessProvider -import net.corda.core.serialization.SerializationDefaults +import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.node.services.config.RaftConfig @@ -36,11 +32,14 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX -import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import java.nio.file.Path +import java.time.Clock import java.util.concurrent.CompletableFuture import javax.annotation.concurrent.ThreadSafe -import javax.persistence.* +import javax.persistence.Column +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.Table /** * A uniqueness provider that records committed input states in a distributed collection replicated and @@ -51,37 +50,49 @@ import javax.persistence.* * to the cluster leader to be actioned. */ @ThreadSafe -class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfiguration, private val db: CordaPersistence, private val metrics: MetricRegistry, private val raftConfig: RaftConfig) : UniquenessProvider, SingletonSerializeAsToken() { +class RaftUniquenessProvider( + private val transportConfiguration: NodeSSLConfiguration, + private val db: CordaPersistence, + private val clock: Clock, + private val metrics: MetricRegistry, + private val raftConfig: RaftConfig +) : UniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = contextLogger() - fun createMap(): AppendOnlyPersistentMap, RaftState, String> = + fun createMap(): AppendOnlyPersistentMap, CommittedState, PersistentStateRef> = AppendOnlyPersistentMap( - toPersistentEntityKey = { it }, + toPersistentEntityKey = { PersistentStateRef(it) }, fromPersistentEntity = { - Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))) + val txId = it.id.txId + ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val index = it.id.index ?: throw IllegalStateException("DB returned null SecureHash index") + Pair( + StateRef(txhash = SecureHash.parse(txId), index = index), + Pair(it.index, SecureHash.parse(it.value) as SecureHash)) + }, - toPersistentEntity = { k: String, v: Pair -> - RaftState().apply { - key = k - value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes - index = v.first - } + toPersistentEntity = { k: StateRef, v: Pair -> + CommittedState( + PersistentStateRef(k), + v.second.toString(), + v.first) + }, - persistentEntityClass = RaftState::class.java + persistentEntityClass = CommittedState::class.java ) + + fun StateRef.encoded() = "$txhash:$index" + fun String.parseStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) } } @Entity @Table(name = "${NODE_DATABASE_PREFIX}raft_committed_states") - class RaftState( - @Id - @Column(name = "id") - var key: String = "", - - @Lob - @Column(name = "state_value") - var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "state_index") + class CommittedState( + @EmbeddedId + val id: PersistentStateRef, + @Column(name = "consuming_transaction_id") + var value: String = "", + @Column(name = "raft_log_index") var index: Long = 0 ) @@ -100,41 +111,17 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur fun start() { log.info("Creating Copycat server, log stored in: ${storagePath.toFile()}") val stateMachineFactory = { - DistributedImmutableMap(db, RaftUniquenessProvider.Companion::createMap) + RaftTransactionCommitLog(db, clock, RaftUniquenessProvider.Companion::createMap) } val address = raftConfig.nodeAddress.let { Address(it.host, it.port) } val storage = buildStorage(storagePath) val transport = buildTransport(transportConfiguration) - val serializer = Serializer().apply { - // Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: - register(DistributedImmutableMap.Commands.PutAll::class.java) { - object : TypeSerializer> { - override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>, - buffer: BufferOutput>, - serializer: Serializer) { - writeMap(obj.entries, buffer, serializer) - } - - override fun read(type: Class>, - buffer: BufferInput>, - serializer: Serializer): DistributedImmutableMap.Commands.PutAll { - return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer)) - } - } - } - register(LinkedHashMap::class.java) { - object : TypeSerializer> { - override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput>, serializer: Serializer) = writeMap(obj, buffer, serializer) - override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer) = readMap(buffer, serializer) - } - } - } server = CopycatServer.builder(address) .withStateMachine(stateMachineFactory) .withStorage(storage) .withServerTransport(transport) - .withSerializer(serializer) + .withSerializer(RaftTransactionCommitLog.serializer) .build() val serverFuture = if (raftConfig.clusterAddresses.isNotEmpty()) { @@ -151,7 +138,7 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur val client = CopycatClient.builder(address) .withTransport(transport) // TODO: use local transport for client-server communications .withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF) - .withSerializer(serializer) + .withSerializer(RaftTransactionCommitLog.serializer) .withRecoveryStrategy(RecoveryStrategies.RECOVER) .build() _clientFuture = serverFuture.thenCompose { client.connect(address) } @@ -200,50 +187,22 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur } - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { - val entries = states.mapIndexed { i, stateRef -> stateRef to UniquenessProvider.ConsumingTx(txId, i, callerIdentity) } - + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { log.debug("Attempting to commit input states: ${states.joinToString()}") - val commitCommand = DistributedImmutableMap.Commands.PutAll(encode(entries)) + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction( + states, + txId, + callerIdentity.name.toString(), + requestSignature.serialize().bytes + ) val conflicts = client.submit(commitCommand).get() - if (conflicts.isNotEmpty()) { - val conflictingStates = decode(conflicts).mapValues { StateConsumptionDetails(it.value.id.sha256()) } + val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } val error = NotaryError.Conflict(txId, conflictingStates) throw NotaryInternalException(error) } log.debug("All input states of transaction $txId have been committed") } - - /** - * Copycat uses its own serialization framework so we convert and store entries as String -> ByteArray - * here to avoid having to define additional serializers for our custom types. - */ - private fun encode(items: List>): Map { - fun StateRef.encoded() = "$txhash:$index" - return items.map { it.first.encoded() to it.second.serialize().bytes }.toMap() - } - - private fun decode(items: Map): Map { - fun String.toStateRef() = split(":").let { StateRef(SecureHash.parse(it[0]), it[1].toInt()) } - return items.map { it.key.toStateRef() to it.value.deserialize() }.toMap() - } } -private fun writeMap(map: Map<*, *>, buffer: BufferOutput>, serializer: Serializer) = with(map) { - buffer.writeInt(size) - forEach { - with(serializer) { - writeObject(it.key, buffer) - writeObject(it.value, buffer) - } - } -} -private fun readMap(buffer: BufferInput>, serializer: Serializer): LinkedHashMap { - return LinkedHashMap().apply { - repeat(buffer.readInt()) { - put(serializer.readObject(buffer), serializer.readObject(buffer)) - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index f9ace1e95e..65f64d5a8c 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -8,7 +8,7 @@ import java.security.PublicKey /** A simple Notary service that does not perform transaction validation */ class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index 57edcfecaf..a2550ee1de 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -5,12 +5,11 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TransactionVerificationException import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow -import net.corda.core.internal.validateRequest +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction -import net.corda.core.utilities.unwrap import java.security.SignatureException /** @@ -25,9 +24,10 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor * the transaction in question has all required signatures apart from the notary's. */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { try { - val stx = receiveTransaction() + val stx = requestPayload.signedTransaction + validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature) val notary = stx.notary checkNotary(notary) resolveAndContractVerify(stx) @@ -43,15 +43,6 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor } } - @Suspendable - private fun receiveTransaction(): SignedTransaction { - return otherSideSession.receive().unwrap { - val stx = it.signedTransaction - validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature) - stx - } - } - @Suspendable private fun resolveAndContractVerify(stx: SignedTransaction) { subFlow(ResolveTransactionsFlow(stx, otherSideSession)) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 01da0911dd..24f33c539f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -8,7 +8,7 @@ import java.security.PublicKey /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this) diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index 55aa5d06e4..4d866546bc 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -1,9 +1,12 @@ package net.corda.node.services.transactions +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.NullKeys import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 -import net.corda.core.flows.NotaryInternalException +import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryInternalException import net.corda.core.identity.CordaX500Name import net.corda.node.internal.configureDatabase import net.corda.node.services.schema.NodeSchemaService @@ -19,6 +22,7 @@ import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.time.Clock import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -28,6 +32,7 @@ class PersistentUniquenessProviderTests { val testSerialization = SerializationEnvironmentRule() private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val txID = SecureHash.randomSHA256() + private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) private lateinit var database: CordaPersistence @@ -46,23 +51,25 @@ class PersistentUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { database.transaction { - val provider = PersistentUniquenessProvider() + val provider = PersistentUniquenessProvider(Clock.systemUTC()) val inputState = generateStateRef() - provider.commit(listOf(inputState), txID, identity) + provider.commit(listOf(inputState), txID, identity, requestSignature) } } @Test fun `should report a conflict for a transaction with previously used inputs`() { database.transaction { - val provider = PersistentUniquenessProvider() + val provider = PersistentUniquenessProvider(Clock.systemUTC()) val inputState = generateStateRef() val inputs = listOf(inputState) - provider.commit(inputs, txID, identity) + provider.commit(inputs, txID, identity, requestSignature) - val ex = assertFailsWith { provider.commit(inputs, txID, identity) } + val ex = assertFailsWith { + provider.commit(inputs, txID, identity, requestSignature) + } val error = ex.error as NotaryError.Conflict val conflictCause = error.consumedStates[inputState]!! diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt similarity index 65% rename from node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt rename to node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt index d8a7611cb1..e00d5b0c57 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt @@ -6,6 +6,8 @@ import io.atomix.copycat.client.CopycatClient import io.atomix.copycat.server.CopycatServer import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash import net.corda.core.internal.concurrent.asCordaFuture import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.NetworkHostAndPort @@ -14,20 +16,22 @@ import net.corda.node.internal.configureDatabase import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.internal.LogHelper +import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.freeLocalHostAndPort -import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.internal.LogHelper import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.time.Clock import java.util.concurrent.CompletableFuture import kotlin.test.assertEquals import kotlin.test.assertTrue -class DistributedImmutableMapTests { +class RaftTransactionCommitLogTests { data class Member(val client: CopycatClient, val server: CopycatServer) @Rule @@ -40,12 +44,13 @@ class DistributedImmutableMapTests { @Before fun setup() { LogHelper.setLevel("-org.apache.activemq") + LogHelper.setLevel("+io.atomix") cluster = setUpCluster() } @After fun tearDown() { - LogHelper.reset("org.apache.activemq") + LogHelper.reset("org.apache.activemq", "io.atomix") cluster.map { it.client.close().asCordaFuture() }.transpose().getOrThrow() cluster.map { it.server.shutdown().asCordaFuture() }.transpose().getOrThrow() databases.forEach { it.close() } @@ -55,28 +60,38 @@ class DistributedImmutableMapTests { fun `stores entries correctly`() { val client = cluster.last().client - val entries = mapOf("key1" to "value1", "key2" to "value2") + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) - val conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) + val conflict = client.submit(commitCommand).getOrThrow() assertTrue { conflict.isEmpty() } - val value1 = client.submit(DistributedImmutableMap.Commands.Get("key1")) - val value2 = client.submit(DistributedImmutableMap.Commands.Get("key2")) + val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0])) + val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1])) - assertEquals(value1.getOrThrow(), "value1") - assertEquals(value2.getOrThrow(), "value2") + assertEquals(value1.getOrThrow(), txId) + assertEquals(value2.getOrThrow(), txId) } @Test fun `returns conflict for duplicate entries`() { val client = cluster.last().client - val entries = mapOf("key1" to "value1", "key2" to "value2") + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) - var conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) + var conflict = client.submit(commitCommand).getOrThrow() assertTrue { conflict.isEmpty() } - conflict = client.submit(DistributedImmutableMap.Commands.PutAll(entries)).getOrThrow() - assertTrue { conflict == entries } + + conflict = client.submit(commitCommand).getOrThrow() + assertEquals(conflict.keys, states.toSet()) + conflict.forEach { assertEquals(it.value, txId) } } private fun setUpCluster(nodeCount: Int = 3): List { @@ -91,11 +106,12 @@ class DistributedImmutableMapTests { val address = Address(myAddress.host, myAddress.port) val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(serverNameTablePrefix = "PORT_${myAddress.port}_"), rigorousMock(), NodeSchemaService(includeNotarySchemas = true)) databases.add(database) - val stateMachineFactory = { DistributedImmutableMap(database, RaftUniquenessProvider.Companion::createMap) } + val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) } val server = CopycatServer.builder(address) .withStateMachine(stateMachineFactory) .withStorage(storage) + .withSerializer(RaftTransactionCommitLog.serializer) .build() val serverInitFuture = if (clusterAddress != null) { @@ -107,6 +123,7 @@ class DistributedImmutableMapTests { val client = CopycatClient.builder(address) .withConnectionStrategy(ConnectionStrategies.EXPONENTIAL_BACKOFF) + .withSerializer(RaftTransactionCommitLog.serializer) .build() return serverInitFuture.thenCompose { client.connect(address) }.thenApply { Member(it, server) } } diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index 18f2c2c6be..fc58760a5c 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -5,14 +5,13 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TransactionVerificationException import net.corda.core.flows.* import net.corda.core.internal.ResolveTransactionsFlow -import net.corda.core.internal.validateRequest +import net.corda.core.internal.validateRequestSignature import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction -import net.corda.core.utilities.unwrap import net.corda.node.services.transactions.PersistentUniquenessProvider import java.security.PublicKey import java.security.SignatureException @@ -26,7 +25,7 @@ import java.security.SignatureException // START 1 @CordaService class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider() + override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this) @@ -43,9 +42,10 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating * transaction dependency chain. */ @Suspendable - override fun receiveAndVerifyTx(): TransactionParts { + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { try { - val stx = receiveTransaction() + val stx = requestPayload.signedTransaction + validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature) val notary = stx.notary checkNotary(notary) verifySignatures(stx) @@ -61,15 +61,6 @@ class MyValidatingNotaryFlow(otherSide: FlowSession, service: MyCustomValidating } } - @Suspendable - private fun receiveTransaction(): SignedTransaction { - return otherSideSession.receive().unwrap { - val stx = it.signedTransaction - validateRequest(NotarisationRequest(stx.inputs, stx.id), it.requestSignature) - stx - } - } - @Suspendable private fun resolveAndContractVerify(stx: SignedTransaction) { subFlow(ResolveTransactionsFlow(stx, otherSideSession))