diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 9ece3eba26..da5e9001a0 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2123,7 +2123,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l ## @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.node.services.TrustedAuthorityNotaryService extends net.corda.core.node.services.NotaryService public () - public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) + public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow) @org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog() @org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider() @@ -2139,7 +2139,7 @@ public static final class net.corda.core.node.services.TrustedAuthorityNotarySer @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.UniquenessProvider$Conflict getError() ## public interface net.corda.core.node.services.UniquenessProvider - public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature) + public abstract void commit(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party, net.corda.core.flows.NotarisationRequestSignature, net.corda.core.contracts.TimeWindow) ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.node.services.UniquenessProvider$Conflict extends java.lang.Object public (Map) diff --git a/build.gradle b/build.gradle index fd02b98d4d..2e09315ac1 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,7 @@ buildscript { * The issue has been reported to upstream: * https://issues.apache.org/jira/browse/ARTEMIS-1559 */ - ext.artemis_version = '2.2.0' + ext.artemis_version = '2.5.0' ext.jackson_version = '2.9.3' ext.jetty_version = '9.4.7.v20170914' ext.jersey_version = '2.25' @@ -61,7 +61,7 @@ buildscript { ext.caffeine_version = constants.getProperty("caffeineVersion") ext.metrics_version = constants.getProperty("metricsVersion") ext.okhttp_version = '3.5.0' - ext.netty_version = '4.1.9.Final' + ext.netty_version = '4.1.22.Final' ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion") ext.fileupload_version = '1.3.3' ext.junit_version = '4.12' diff --git a/core/src/main/kotlin/net/corda/core/contracts/Structures.kt b/core/src/main/kotlin/net/corda/core/contracts/Structures.kt index 5df89d72fe..e672218ab2 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/Structures.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/Structures.kt @@ -72,7 +72,7 @@ const val MAX_ISSUER_REF_SIZE = 512 * cares about specific issuers with code that will accept any, or which is imposing issuer constraints via some * other mechanism and the additional type safety is not wanted. */ -fun Amount>.withoutIssuer(): Amount = Amount(quantity, token.product) +fun Amount>.withoutIssuer(): Amount = Amount(quantity, displayTokenSize, token.product) // DOCSTART 3 diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index bef1d627cf..61d256bc5e 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -156,8 +156,8 @@ class NotaryFlow { try { val parts = validateRequest(requestPayload) txId = parts.id - service.validateTimeWindow(parts.timestamp) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature) + checkNotary(parts.notary) + service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp) signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) diff --git a/core/src/main/kotlin/net/corda/core/internal/NotaryUtils.kt b/core/src/main/kotlin/net/corda/core/internal/NotaryUtils.kt index a014e8ba17..21e639c2e1 100644 --- a/core/src/main/kotlin/net/corda/core/internal/NotaryUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/NotaryUtils.kt @@ -1,9 +1,14 @@ package net.corda.core.internal +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.crypto.isFulfilledBy import net.corda.core.flows.NotarisationResponse +import net.corda.core.flows.NotaryError +import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party +import java.time.Instant /** * Checks that there are sufficient signatures to satisfy the notary signing requirement and validates the signatures @@ -13,4 +18,19 @@ fun NotarisationResponse.validateSignatures(txId: SecureHash, notary: Party) { val signingKeys = signatures.map { it.by } require(notary.owningKey.isFulfilledBy(signingKeys)) { "Insufficient signatures to fulfill the notary signing requirement for $notary" } signatures.forEach { it.verify(txId) } +} + +/** Checks if the provided states were used as inputs in the specified transaction. */ +fun isConsumedByTheSameTx(txIdHash: SecureHash, consumedStates: Map): Boolean { + val conflicts = consumedStates.filter { (_, cause) -> + cause.hashOfTransactionId != txIdHash + } + return conflicts.isEmpty() +} + +/** Returns [NotaryError.TimeWindowInvalid] if [currentTime] is outside the [timeWindow], and *null* otherwise. */ +fun validateTimeWindow(currentTime: Instant, timeWindow: TimeWindow?): NotaryError.TimeWindowInvalid? { + return if (timeWindow != null && currentTime !in timeWindow) { + NotaryError.TimeWindowInvalid(currentTime, timeWindow) + } else null } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt index b2780660ce..116f7db74b 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -27,6 +27,7 @@ abstract class NotaryService : SingletonSerializeAsToken() { companion object { @Deprecated("No longer used") const val ID_PREFIX = "corda.notary." + @Deprecated("No longer used") fun constructId(validating: Boolean, raft: Boolean = false, bft: Boolean = false, custom: Boolean = false): String { require(Booleans.countTrue(raft, bft, custom) <= 1) { "At most one of raft, bft or custom may be true" } @@ -89,9 +90,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that * this method does not throw an exception when input states are present multiple times within the transaction. */ - fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature) { + fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { try { - uniquenessProvider.commit(inputs, txId, caller, requestSignature) + uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow) } catch (e: NotaryInternalException) { if (e.error is NotaryError.Conflict) { val conflicts = inputs.filterIndexed { _, stateRef -> diff --git a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt index 7523ab888f..bbe132d134 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/UniquenessProvider.kt @@ -12,6 +12,7 @@ package net.corda.core.node.services import net.corda.core.CordaException import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party @@ -25,7 +26,13 @@ import net.corda.core.serialization.CordaSerializable */ interface UniquenessProvider { /** Commits all input states of the given transaction. */ - fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) + fun commit( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow? = null + ) /** Specifies the consuming transaction for every conflicting state. */ @CordaSerializable diff --git a/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/ScenarioState.kt b/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/ScenarioState.kt index 3f6e0d91e0..0068b9bb63 100644 --- a/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/ScenarioState.kt +++ b/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/ScenarioState.kt @@ -11,7 +11,6 @@ package net.corda.behave.scenarios import cucumber.api.java.After -import net.corda.behave.logging.getLogger import net.corda.behave.network.Network import net.corda.behave.node.Node import net.corda.core.messaging.CordaRPCOps @@ -20,8 +19,6 @@ import java.time.Duration class ScenarioState { - private val log = getLogger() - private val nodes = mutableListOf() private var network: Network? = null diff --git a/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/helpers/Substeps.kt b/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/helpers/Substeps.kt index d22201bc84..b9b7ad8044 100644 --- a/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/helpers/Substeps.kt +++ b/experimental/behave/src/scenario/kotlin/net/corda/behave/scenarios/helpers/Substeps.kt @@ -10,13 +10,13 @@ package net.corda.behave.scenarios.helpers -import net.corda.behave.logging.getLogger import net.corda.behave.scenarios.ScenarioState import net.corda.core.messaging.CordaRPCOps +import net.corda.core.utilities.contextLogger abstract class Substeps(protected val state: ScenarioState) { - protected val log = getLogger() + protected val log = contextLogger() protected fun withNetwork(action: ScenarioState.() -> Unit) = state.withNetwork(action) diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt index 93416d3f5d..9b764ea445 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt @@ -51,8 +51,9 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v cashSelectionAlgo?.let { instance.set(cashSelectionAlgo) cashSelectionAlgo - } ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver ($_metadata)." + - "\nPlease specify an implementation in META-INF/services/${AbstractCashSelection::class.java}") + } ?: throw ClassNotFoundException("\nUnable to load compatible cash selection algorithm implementation for JDBC driver name '${_metadata.driverName}'." + + "\nPlease specify an implementation in META-INF/services/${AbstractCashSelection::class.qualifiedName}." + + "\nAvailable implementations: $cashSelectionAlgos") }.invoke() } diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt index e32bb95a04..dd4d0f8659 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt @@ -32,7 +32,7 @@ class CashSelectionH2Impl : AbstractCashSelection() { return metadata.driverName == JDBC_DRIVER_NAME } - override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME" + override fun toString() = "${this::class.qualifiedName} for '$JDBC_DRIVER_NAME'" // We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins: // 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt deleted file mode 100644 index 8cfb48124a..0000000000 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionMySQLImpl.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.finance.contracts.asset.cash.selection - -import net.corda.core.contracts.Amount -import net.corda.core.identity.AbstractParty -import net.corda.core.identity.Party -import net.corda.core.utilities.OpaqueBytes -import java.sql.Connection -import java.sql.DatabaseMetaData -import java.sql.ResultSet -import java.util.* - -class CashSelectionMySQLImpl : AbstractCashSelection() { - - companion object { - const val JDBC_DRIVER_NAME = "MySQL JDBC Driver" - } - - override fun isCompatible(metadata: DatabaseMetaData): Boolean { - return metadata.driverName == JDBC_DRIVER_NAME - } - - override fun executeQuery(statement: Connection, amount: Amount, lockId: UUID, notary: Party?, issuerKeysStr: Set, issuerRefsStr: Set, withResultSet: (ResultSet) -> Boolean): Boolean { - TODO("MySQL cash selection not implemented") - } - - override fun toString() = "${this::class.java} for ${CashSelectionH2Impl.JDBC_DRIVER_NAME}" -} \ No newline at end of file diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt index 6f85002a6c..d219fb3a2e 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt @@ -23,7 +23,7 @@ import java.util.* class CashSelectionPostgreSQLImpl : AbstractCashSelection() { companion object { - val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver" + const val JDBC_DRIVER_NAME = "PostgreSQL JDBC Driver" private val log = contextLogger() } @@ -31,7 +31,7 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { return metadata.driverName == JDBC_DRIVER_NAME } - override fun toString() = "${this::class.java} for $JDBC_DRIVER_NAME" + override fun toString() = "${this::class.qualifiedName} for '$JDBC_DRIVER_NAME'" // This is using PostgreSQL window functions for selecting a minimum set of rows that match a request amount of coins: // 1) This may also be possible with user-defined functions (e.g. using PL/pgSQL) diff --git a/finance/src/main/resources/META-INF/services/net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection b/finance/src/main/resources/META-INF/services/net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection index f622134554..11c20fb11e 100644 --- a/finance/src/main/resources/META-INF/services/net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection +++ b/finance/src/main/resources/META-INF/services/net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection @@ -9,7 +9,6 @@ # net.corda.finance.contracts.asset.cash.selection.CashSelectionH2Impl -net.corda.finance.contracts.asset.cash.selection.CashSelectionMySQLImpl net.corda.finance.contracts.asset.cash.selection.CashSelectionPostgreSQLImpl net.corda.finance.contracts.asset.cash.selection.CashSelectionSQLServerImpl net.corda.finance.contracts.asset.cash.selection.CashSelectionOracleImpl diff --git a/node-api/build.gradle b/node-api/build.gradle index 3ab4537b34..921971a446 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -30,6 +30,9 @@ dependencies { compile "org.apache.activemq:artemis-core-client:${artemis_version}" compile "org.apache.activemq:artemis-commons:${artemis_version}" + // Netty: All of it. + compile "io.netty:netty-all:$netty_version" + // For adding serialisation of file upload streams to RPC // TODO: Remove this dependency and the code that requires it compile "commons-fileupload:commons-fileupload:$fileupload_version" diff --git a/node/build.gradle b/node/build.gradle index 7e64573862..6beecc4333 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -157,12 +157,9 @@ dependencies { compileOnly "co.paralleluniverse:capsule:$capsule_version" // Java Atomix: RAFT library - compile 'io.atomix.copycat:copycat-client:1.2.3' - compile 'io.atomix.copycat:copycat-server:1.2.3' - compile 'io.atomix.catalyst:catalyst-netty:1.1.2' - - // Netty: All of it. - compile "io.netty:netty-all:$netty_version" + compile 'io.atomix.copycat:copycat-client:1.2.8' + compile 'io.atomix.copycat:copycat-server:1.2.8' + compile 'io.atomix.catalyst:catalyst-netty:1.2.1' // OkHTTP: Simple HTTP library. compile "com.squareup.okhttp3:okhttp:$okhttp_version" diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt index 54fb916a6e..44e4237dd8 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -15,8 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateRef -import net.corda.core.crypto.CompositeKey -import net.corda.core.crypto.sha256 +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.* import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryException import net.corda.core.flows.NotaryFlow @@ -41,97 +41,85 @@ import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.contracts.DummyContract import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.PortAllocation +import net.corda.testing.node.TestClock import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork.MockNode import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.startFlow -import org.junit.After -import org.junit.Before -import org.junit.ClassRule -import org.junit.Test +import org.hamcrest.Matchers.instanceOf +import org.junit.* +import org.junit.Assert.assertThat import java.nio.file.Paths +import java.time.Duration +import java.time.Instant +import java.util.* +import java.util.concurrent.ExecutionException import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertTrue -class BFTNotaryServiceTests : IntegrationTest() { +class BFTNotaryServiceTests { companion object { + private lateinit var mockNet: InternalMockNetwork + private lateinit var notary: Party + private lateinit var node: StartedNode + @ClassRule @JvmField val databaseSchemas = IntegrationTestSchemas("node_0", "node_1", "node_2", "node_3", "node_4", "node_5", "node_6", "node_7", "node_8", "node_9") - } - private lateinit var mockNet: InternalMockNetwork - private lateinit var notary: Party - private lateinit var node: StartedNode - - @Before - fun before() { - mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts")) - } - - @After - fun stopNodes() { - mockNet.stopNodes() - } - - private fun startBftClusterAndNode(clusterSize: Int, exposeRaces: Boolean = false) { - (Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists? - val replicaIds = (0 until clusterSize) - - notary = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( - replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, - CordaX500Name("BFT", "Zurich", "CH")) - - val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notary, false)))) - - val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) } - - val nodes = replicaIds.map { replicaId -> - mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = { - val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces)) - doReturn(notary).whenever(it).notary - })) - } + mockNet.createUnstartedNode() - - // MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the - // network-parameters in their directories before they're started. - node = nodes.map { node -> - networkParameters.install(mockNet.baseDirectory(node.id)) - node.start() - }.last() - } - - /** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */ - @Test - fun `all replicas start even if there is a new consensus during startup`() { - startBftClusterAndNode(minClusterSize(1), exposeRaces = true) // This true adds a sleep to expose the race. - val f = node.run { - val trivialTx = signInitialTransaction(notary) { - addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) - } - // Create a new consensus while the redundant replica is sleeping: - services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture + @BeforeClass + @JvmStatic + fun before() { + mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts")) + val clusterSize = minClusterSize(1) + val started = startBftClusterAndNode(clusterSize, mockNet) + notary = started.first + node = started.second + } + + @AfterClass + @JvmStatic + fun stopNodes() { + mockNet.stopNodes() + } + + fun startBftClusterAndNode(clusterSize: Int, mockNet: InternalMockNetwork, exposeRaces: Boolean = false): Pair> { + (Paths.get("config") / "currentView").deleteIfExists() // XXX: Make config object warn if this exists? + val replicaIds = (0 until clusterSize) + + val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( + replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, + CordaX500Name("BFT", "Zurich", "CH")) + + val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false)))) + + val clusterAddresses = replicaIds.map { NetworkHostAndPort("localhost", 11000 + it * 10) } + + val nodes = replicaIds.map { replicaId -> + mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = { + val notary = NotaryConfig(validating = false, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses, exposeRaces = exposeRaces)) + doReturn(notary).whenever(it).notary + })) + } + mockNet.createUnstartedNode() + + // MockNetwork doesn't support BFT clusters, so we create all the nodes we need unstarted, and then install the + // network-parameters in their directories before they're started. + val node = nodes.map { node -> + networkParameters.install(mockNet.baseDirectory(node.id)) + node.start() + }.last() + + return Pair(notaryIdentity, node) } - mockNet.runNetwork() - f.getOrThrow() } @Test - fun `detect double spend 1 faulty`() { - detectDoubleSpend(1) - } - - @Test - fun `detect double spend 2 faulty`() { - detectDoubleSpend(2) - } - - private fun detectDoubleSpend(faultyReplicas: Int) { - val clusterSize = minClusterSize(faultyReplicas) - startBftClusterAndNode(clusterSize) + fun `detect double spend`() { node.run { val issueTx = signInitialTransaction(notary) { addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) @@ -152,7 +140,7 @@ class BFTNotaryServiceTests : IntegrationTest() { val successfulIndex = results.mapIndexedNotNull { index, result -> if (result is Try.Success) { val signers = result.value.map { it.by } - assertEquals(minCorrectReplicas(clusterSize), signers.size) + assertEquals(minCorrectReplicas(3), signers.size) signers.forEach { assertTrue(it in (notary.owningKey as CompositeKey).leafKeys) } @@ -174,6 +162,63 @@ class BFTNotaryServiceTests : IntegrationTest() { } } + @Test + fun `transactions outside their time window are rejected`() { + node.run { + val issueTx = signInitialTransaction(notary) { + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + database.transaction { + services.recordTransactions(issueTx) + } + val spendTx = signInitialTransaction(notary) { + addInputState(issueTx.tx.outRef(0)) + setTimeWindow(TimeWindow.fromOnly(Instant.MAX)) + } + val flow = NotaryFlow.Client(spendTx) + val resultFuture = services.startFlow(flow).resultFuture + mockNet.runNetwork() + val exception = assertFailsWith { resultFuture.get() } + assertThat(exception.cause, instanceOf(NotaryException::class.java)) + val error = (exception.cause as NotaryException).error + assertThat(error, instanceOf(NotaryError.TimeWindowInvalid::class.java)) + } + } + + @Test + fun `transactions can be re-notarised outside their time window`() { + node.run { + val issueTx = signInitialTransaction(notary) { + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + database.transaction { + services.recordTransactions(issueTx) + } + val spendTx = signInitialTransaction(notary) { + addInputState(issueTx.tx.outRef(0)) + setTimeWindow(TimeWindow.untilOnly(Instant.now() + Duration.ofHours(1))) + } + val resultFuture = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture + mockNet.runNetwork() + val signatures = resultFuture.get() + verifySignatures(signatures, spendTx.id) + + for (node in mockNet.nodes) { + (node.started!!.services.clock as TestClock).advanceBy(Duration.ofDays(1)) + } + + val resultFuture2 = services.startFlow(NotaryFlow.Client(spendTx)).resultFuture + mockNet.runNetwork() + val signatures2 = resultFuture2.get() + verifySignatures(signatures2, spendTx.id) + } + } + + private fun verifySignatures(signatures: List, txId: SecureHash) { + notary.owningKey.isFulfilledBy(signatures.map { it.by }) + signatures.forEach { it.verify(txId) } + } + private fun StartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { return services.signInitialTransaction( TransactionBuilder(notary).apply { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt new file mode 100644 index 0000000000..3f947a6d33 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt @@ -0,0 +1,59 @@ +package net.corda.node.services + +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.flows.NotaryFlow +import net.corda.core.identity.Party +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.StartedNode +import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode +import net.corda.node.services.transactions.minClusterSize +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.dummyCommand +import net.corda.testing.core.singleIdentity +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNetwork.MockNode +import net.corda.testing.node.internal.startFlow +import org.junit.After +import org.junit.Before +import org.junit.Test + +class BFTSMaRtTests { + private lateinit var mockNet: InternalMockNetwork + + @Before + fun before() { + mockNet = InternalMockNetwork(listOf("net.corda.testing.contracts")) + } + + @After + fun stopNodes() { + mockNet.stopNodes() + } + + /** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */ + @Test + fun `all replicas start even if there is a new consensus during startup`() { + val clusterSize = minClusterSize(1) + val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race. + val f = node.run { + val trivialTx = signInitialTransaction(notary) { + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + // Create a new consensus while the redundant replica is sleeping: + services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture + } + mockNet.runNetwork() + f.getOrThrow() + } + + private fun StartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { + return services.signInitialTransaction( + TransactionBuilder(notary).apply { + addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey)) + block() + } + ) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/distributed/DistributedServiceTests.kt similarity index 99% rename from node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt rename to node/src/integration-test/kotlin/net/corda/node/services/distributed/DistributedServiceTests.kt index c83010727d..285ac7a090 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/distributed/DistributedServiceTests.kt @@ -8,7 +8,7 @@ * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. */ -package net.corda.node.services +package net.corda.node.services.distributed import net.corda.client.rpc.CordaRPCClient import net.corda.core.contracts.Amount diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 3f897aa782..c76545bb4d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -149,9 +149,9 @@ class BFTNonValidatingNotaryService( val id = transaction.id val inputs = transaction.inputs val notary = transaction.notary - if (transaction is FilteredTransaction) NotaryService.validateTimeWindow(services.clock, transaction.timeWindow) + val timeWindow = (transaction as? FilteredTransaction)?.timeWindow if (notary !in services.myInfo.legalIdentities) throw NotaryInternalException(NotaryError.WrongNotary) - commitInputStates(inputs, id, callerIdentity.name, requestSignature) + commitInputStates(inputs, id, callerIdentity.name, requestSignature, timeWindow) log.debug { "Inputs committed successfully, signing $id" } BFTSMaRt.ReplicaResponse.Signature(sign(id)) } catch (e: NotaryInternalException) { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 99a7742bef..4a9ac11c66 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -23,12 +23,15 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable import bftsmart.tom.server.defaultservices.DefaultReplier import bftsmart.tom.util.Extractor import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.* import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.declaredField +import net.corda.core.internal.isConsumedByTheSameTx import net.corda.core.internal.toTypedArray +import net.corda.core.internal.validateTimeWindow import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -226,25 +229,27 @@ object BFTSMaRt { */ abstract fun executeCommand(command: ByteArray): ByteArray? - protected fun commitInputStates(states: List, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature) { + protected fun commitInputStates(states: List, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { log.debug { "Attempting to commit inputs for transaction: $txId" } - - val conflicts = mutableMapOf() services.database.transaction { logRequest(txId, callerName, requestSignature) - states.forEach { state -> - commitLog[state]?.let { conflicts[state] = it } + val conflictingStates = LinkedHashMap() + for (state in states) { + commitLog[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.sha256()) } } - if (conflicts.isEmpty()) { - log.debug { "No conflicts detected, committing input states: ${states.joinToString()}" } - states.forEach { stateRef -> - commitLog[stateRef] = txId + if (conflictingStates.isNotEmpty()) { + if (!isConsumedByTheSameTx(txId.sha256(), conflictingStates)) { + log.debug { "Failure, input states already committed: ${conflictingStates.keys}" } + throw NotaryInternalException(NotaryError.Conflict(txId, conflictingStates)) } } else { - log.debug { "Conflict detected – the following inputs have already been committed: ${conflicts.keys.joinToString()}" } - val conflict = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } - val error = NotaryError.Conflict(txId, conflict) - throw NotaryInternalException(error) + val outsideTimeWindowError = validateTimeWindow(services.clock.instant(), timeWindow) + if (outsideTimeWindowError == null) { + states.forEach { commitLog[it] = txId } + log.debug { "Successfully committed all input states: $states" } + } else { + throw NotaryInternalException(outsideTimeWindowError) + } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 043ac9ac44..23d155bfc3 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -11,6 +11,7 @@ package net.corda.node.services.transactions import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 import net.corda.core.flows.NotarisationRequestSignature @@ -19,12 +20,15 @@ import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox +import net.corda.core.internal.isConsumedByTheSameTx +import net.corda.core.internal.validateTimeWindow import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession @@ -74,7 +78,7 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash) private class InnerState { - val committedStates = createMap() + val commitLog = createMap() } private val mutex = ThreadBox(InnerState()) @@ -105,10 +109,21 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl ) } - override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { - logRequest(txId, callerIdentity, requestSignature) - val conflict = commitStates(states, txId) - if (conflict != null) throw NotaryInternalException(NotaryError.Conflict(txId, conflict)) + override fun commit( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?) { + mutex.locked { + logRequest(txId, callerIdentity, requestSignature) + val conflictingStates = findAlreadyCommitted(states, commitLog) + if (conflictingStates.isNotEmpty()) { + handleConflicts(txId, conflictingStates) + } else { + handleNoConflicts(timeWindow, states, txId, commitLog) + } + } } private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { @@ -122,25 +137,35 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl session.persist(request) } - private fun commitStates(states: List, txId: SecureHash): Map? { - val conflict = mutex.locked { - val conflictingStates = LinkedHashMap() - for (inputState in states) { - val consumingTx = committedStates[inputState] - if (consumingTx != null) conflictingStates[inputState] = consumingTx - } - if (conflictingStates.isNotEmpty()) { - log.debug("Failure, input states already committed: ${conflictingStates.keys}") - val conflict = conflictingStates.mapValues { (_, txId) -> StateConsumptionDetails(txId.sha256()) } - conflict - } else { - states.forEach { stateRef -> - committedStates[stateRef] = txId - } - log.debug("Successfully committed all input states: $states") - null - } + private fun findAlreadyCommitted(states: List, commitLog: AppendOnlyPersistentMap): LinkedHashMap { + val conflictingStates = LinkedHashMap() + for (inputState in states) { + val consumingTx = commitLog[inputState] + if (consumingTx != null) conflictingStates[inputState] = StateConsumptionDetails(consumingTx.sha256()) + } + return conflictingStates + } + + private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap) { + if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) { + log.debug { "Transaction $txId already notarised" } + return + } else { + log.debug { "Failure, input states already committed: ${conflictingStates.keys}" } + val conflictError = NotaryError.Conflict(txId, conflictingStates) + throw NotaryInternalException(conflictError) + } + } + + private fun handleNoConflicts(timeWindow: TimeWindow?, states: List, txId: SecureHash, commitLog: AppendOnlyPersistentMap) { + val outsideTimeWindowError = validateTimeWindow(clock.instant(), timeWindow) + if (outsideTimeWindowError == null) { + states.forEach { stateRef -> + commitLog[stateRef] = txId + } + log.debug { "Successfully committed all input states: $states" } + } else { + throw NotaryInternalException(outsideTimeWindowError) } - return conflict } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt index 8a95adb9b4..e2b837c486 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLog.kt @@ -22,16 +22,25 @@ import io.atomix.copycat.server.StateMachine import io.atomix.copycat.server.storage.snapshot.SnapshotReader import io.atomix.copycat.server.storage.snapshot.SnapshotWriter import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.sha256 +import net.corda.core.flows.NotaryError +import net.corda.core.flows.StateConsumptionDetails +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.isConsumedByTheSameTx +import net.corda.core.internal.validateTimeWindow import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SerializationFactory import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.contextLogger -import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.encoded -import net.corda.node.services.transactions.RaftUniquenessProvider.Companion.parseStateRef +import net.corda.core.utilities.debug import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.currentDBSession +import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding import java.time.Clock /** @@ -41,8 +50,8 @@ import java.time.Clock * State re-synchronisation is achieved by replaying the command log to the new (or re-joining) cluster member. */ class RaftTransactionCommitLog( - val db: CordaPersistence, - val nodeClock: Clock, + private val db: CordaPersistence, + private val nodeClock: Clock, createMap: () -> AppendOnlyPersistentMap, E, EK> ) : StateMachine(), Snapshottable { object Commands { @@ -50,8 +59,9 @@ class RaftTransactionCommitLog( val states: List, val txId: SecureHash, val requestingParty: String, - val requestSignature: ByteArray - ) : Command> { + val requestSignature: ByteArray, + val timeWindow: TimeWindow? = null + ) : Command { override fun compaction(): Command.CompactionMode { // The FULL compaction mode retains the command in the log until it has been stored and applied on all // servers in the cluster. Once the commit has been applied to a state machine and closed it may be @@ -72,25 +82,38 @@ class RaftTransactionCommitLog( private val map = db.transaction { createMap() } /** Commits the input states for the transaction as specified in the given [Commands.CommitTransaction]. */ - fun commitTransaction(raftCommit: Commit): Map { + fun commitTransaction(raftCommit: Commit): NotaryError? { raftCommit.use { val index = it.index() - val conflicts = LinkedHashMap() - db.transaction { + return db.transaction { val commitCommand = raftCommit.command() logRequest(commitCommand) val states = commitCommand.states val txId = commitCommand.txId log.debug("State machine commit: storing entries with keys (${states.joinToString()})") + val conflictingStates = LinkedHashMap() for (state in states) { - map[state]?.let { conflicts[state] = it.second } + map[state]?.let { conflictingStates[state] = StateConsumptionDetails(it.second.sha256()) } } - if (conflicts.isEmpty()) { - val entries = states.map { it to Pair(index, txId) }.toMap() - map.putAll(entries) + if (conflictingStates.isNotEmpty()) { + if (isConsumedByTheSameTx(commitCommand.txId.sha256(), conflictingStates)) { + null + } else { + log.debug { "Failure, input states already committed: ${conflictingStates.keys}" } + NotaryError.Conflict(txId, conflictingStates) + } + } else { + val outsideTimeWindowError = validateTimeWindow(clock.instant(), commitCommand.timeWindow) + if (outsideTimeWindowError == null) { + val entries = states.map { it to Pair(index, txId) }.toMap() + map.putAll(entries) + log.debug { "Successfully committed all input states: $states" } + null + } else { + outsideTimeWindowError + } } } - return conflicts } } @@ -169,103 +192,35 @@ class RaftTransactionCommitLog( companion object { private val log = contextLogger() - // Add custom serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: + @VisibleForTesting val serializer: Serializer by lazy { Serializer().apply { - register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java) { - object : TypeSerializer { - override fun write(obj: RaftTransactionCommitLog.Commands.CommitTransaction, - buffer: BufferOutput>, - serializer: Serializer) { - buffer.writeUnsignedShort(obj.states.size) - with(serializer) { - obj.states.forEach { - writeObject(it, buffer) - } - writeObject(obj.txId, buffer) - } - buffer.writeString(obj.requestingParty) - buffer.writeInt(obj.requestSignature.size) - buffer.write(obj.requestSignature) - } + registerAbstract(SecureHash::class.java, CordaKryoSerializer::class.java) + registerAbstract(TimeWindow::class.java, CordaKryoSerializer::class.java) + registerAbstract(NotaryError::class.java, CordaKryoSerializer::class.java) + register(RaftTransactionCommitLog.Commands.CommitTransaction::class.java, CordaKryoSerializer::class.java) + register(RaftTransactionCommitLog.Commands.Get::class.java, CordaKryoSerializer::class.java) + register(StateRef::class.java, CordaKryoSerializer::class.java) + register(LinkedHashMap::class.java, CordaKryoSerializer::class.java) + } + } - override fun read(type: Class, - buffer: BufferInput>, - serializer: Serializer): RaftTransactionCommitLog.Commands.CommitTransaction { - val stateCount = buffer.readUnsignedShort() - val states = (1..stateCount).map { - serializer.readObject(buffer) - } - val txId = serializer.readObject(buffer) - val name = buffer.readString() - val signatureSize = buffer.readInt() - val signature = ByteArray(signatureSize) - buffer.read(signature) - return RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, name, signature) - } - } - } - register(RaftTransactionCommitLog.Commands.Get::class.java) { - object : TypeSerializer { - override fun write(obj: RaftTransactionCommitLog.Commands.Get, buffer: BufferOutput>, serializer: Serializer) { - serializer.writeObject(obj.key, buffer) - } + class CordaKryoSerializer : TypeSerializer { + private val context = SerializationDefaults.CHECKPOINT_CONTEXT.withEncoding(CordaSerializationEncoding.SNAPPY) + private val factory = SerializationFactory.defaultFactory - override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): RaftTransactionCommitLog.Commands.Get { - val key = serializer.readObject(buffer) - return RaftTransactionCommitLog.Commands.Get(key) - } + override fun write(obj: T, buffer: BufferOutput<*>, serializer: Serializer) { + val serialized = obj.serialize(context = context) + buffer.writeInt(serialized.size) + buffer.write(serialized.bytes) + } - } - } - register(StateRef::class.java) { - object : TypeSerializer { - override fun write(obj: StateRef, buffer: BufferOutput>, serializer: Serializer) { - buffer.writeString(obj.encoded()) - } - - override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): StateRef { - return buffer.readString().parseStateRef() - } - } - } - registerAbstract(SecureHash::class.java) { - object : TypeSerializer { - override fun write(obj: SecureHash, buffer: BufferOutput>, serializer: Serializer) { - buffer.writeUnsignedShort(obj.bytes.size) - buffer.write(obj.bytes) - } - - override fun read(type: Class, buffer: BufferInput>, serializer: Serializer): SecureHash { - val size = buffer.readUnsignedShort() - val bytes = ByteArray(size) - buffer.read(bytes) - return SecureHash.SHA256(bytes) - } - } - } - register(LinkedHashMap::class.java) { - object : TypeSerializer> { - override fun write(obj: LinkedHashMap<*, *>, buffer: BufferOutput>, serializer: Serializer) { - buffer.writeInt(obj.size) - obj.forEach { - with(serializer) { - writeObject(it.key, buffer) - writeObject(it.value, buffer) - } - } - } - - override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer): LinkedHashMap<*, *> { - return LinkedHashMap().apply { - repeat(buffer.readInt()) { - put(serializer.readObject(buffer), serializer.readObject(buffer)) - } - } - } - } - } + override fun read(type: Class, buffer: BufferInput<*>, serializer: Serializer): T { + val size = buffer.readInt() + val serialized = ByteArray(size) + buffer.read(serialized) + return factory.deserialize(ByteSequence.of(serialized), type, context) } } } -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index 83001812bc..6e3a50d4e1 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -24,19 +24,19 @@ import io.atomix.copycat.server.cluster.Member import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.sha256 import net.corda.core.flows.NotarisationRequestSignature -import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryInternalException -import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.node.services.config.RaftConfig +import net.corda.node.services.transactions.RaftTransactionCommitLog.Commands.CommitTransaction import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration @@ -197,22 +197,23 @@ class RaftUniquenessProvider( }) } - - override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { - log.debug("Attempting to commit input states: ${states.joinToString()}") - val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction( + override fun commit( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?) { + log.debug { "Attempting to commit input states: ${states.joinToString()}" } + val commitCommand = CommitTransaction( states, txId, callerIdentity.name.toString(), - requestSignature.serialize().bytes + requestSignature.serialize().bytes, + timeWindow ) - val conflicts = client.submit(commitCommand).get() - if (conflicts.isNotEmpty()) { - val conflictingStates = conflicts.mapValues { StateConsumptionDetails(it.value.sha256()) } - val error = NotaryError.Conflict(txId, conflictingStates) - throw NotaryInternalException(error) - } - log.debug("All input states of transaction $txId have been committed") + val commitError = client.submit(commitCommand).get() + if (commitError != null) throw NotaryInternalException(commitError) + log.debug { "All input states of transaction $txId have been committed" } } } diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index 92208a850d..1ae68528f9 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -75,15 +75,17 @@ class PersistentUniquenessProviderTests { val inputState = generateStateRef() val inputs = listOf(inputState) - provider.commit(inputs, txID, identity, requestSignature) + val firstTxId = txID + provider.commit(inputs, firstTxId, identity, requestSignature) + val secondTxId = SecureHash.randomSHA256() val ex = assertFailsWith { - provider.commit(inputs, txID, identity, requestSignature) + provider.commit(inputs, secondTxId, identity, requestSignature) } val error = ex.error as NotaryError.Conflict val conflictCause = error.consumedStates[inputState]!! - assertEquals(conflictCause.hashOfTransactionId, txID.sha256()) + assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) } } } diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt index fbf9fbef5c..815eef3daa 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/RaftTransactionCommitLogTests.kt @@ -17,7 +17,9 @@ import io.atomix.copycat.server.CopycatServer import io.atomix.copycat.server.storage.Storage import io.atomix.copycat.server.storage.StorageLevel import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotaryError import net.corda.core.internal.concurrent.asCordaFuture import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.NetworkHostAndPort @@ -32,14 +34,14 @@ import net.corda.testing.core.freeLocalHostAndPort import net.corda.testing.internal.LogHelper import net.corda.testing.internal.rigorousMock import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties -import org.junit.After -import org.junit.Before -import org.junit.Rule -import org.junit.Test +import org.hamcrest.Matchers.instanceOf +import org.junit.* +import org.junit.Assert.assertThat import java.time.Clock +import java.time.Instant import java.util.concurrent.CompletableFuture import kotlin.test.assertEquals -import kotlin.test.assertTrue +import kotlin.test.assertNull class RaftTransactionCommitLogTests { data class Member(val client: CopycatClient, val server: CopycatServer) @@ -76,8 +78,8 @@ class RaftTransactionCommitLogTests { val requestSignature = ByteArray(1024) val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) - val conflict = client.submit(commitCommand).getOrThrow() - assertTrue { conflict.isEmpty() } + val commitError = client.submit(commitCommand).getOrThrow() + assertNull(commitError) val value1 = client.submit(RaftTransactionCommitLog.Commands.Get(states[0])) val value2 = client.submit(RaftTransactionCommitLog.Commands.Get(states[1])) @@ -91,17 +93,60 @@ class RaftTransactionCommitLogTests { val client = cluster.last().client val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) - val txId: SecureHash = SecureHash.randomSHA256() + val txIdFirst = SecureHash.randomSHA256() + val txIdSecond = SecureHash.randomSHA256() val requestingPartyName = ALICE_NAME val requestSignature = ByteArray(1024) - val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction(states, txId, requestingPartyName.toString(), requestSignature) - var conflict = client.submit(commitCommand).getOrThrow() - assertTrue { conflict.isEmpty() } + val commitCommandFirst = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdFirst, requestingPartyName.toString(), requestSignature) + var commitError = client.submit(commitCommandFirst).getOrThrow() + assertNull(commitError) - conflict = client.submit(commitCommand).getOrThrow() - assertEquals(conflict.keys, states.toSet()) - conflict.forEach { assertEquals(it.value, txId) } + val commitCommandSecond = RaftTransactionCommitLog.Commands.CommitTransaction(states, txIdSecond, requestingPartyName.toString(), requestSignature) + commitError = client.submit(commitCommandSecond).getOrThrow() + val conflict = commitError as NotaryError.Conflict + assertEquals(states.toSet(), conflict.consumedStates.keys) + } + + @Test + fun `transactions outside their time window are rejected`() { + val client = cluster.last().client + + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) + val timeWindow = TimeWindow.fromOnly(Instant.MAX) + + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction( + states, txId, requestingPartyName.toString(), requestSignature, timeWindow + ) + val commitError = client.submit(commitCommand).getOrThrow() + assertThat(commitError, instanceOf(NotaryError.TimeWindowInvalid::class.java)) + } + + @Test + fun `transactions can be re-notarised outside their time window`() { + val client = cluster.last().client + + val states = listOf(StateRef(SecureHash.randomSHA256(), 0), StateRef(SecureHash.randomSHA256(), 0)) + val txId: SecureHash = SecureHash.randomSHA256() + val requestingPartyName = ALICE_NAME + val requestSignature = ByteArray(1024) + val timeWindow = TimeWindow.fromOnly(Instant.MIN) + + val commitCommand = RaftTransactionCommitLog.Commands.CommitTransaction( + states, txId, requestingPartyName.toString(), requestSignature, timeWindow + ) + val commitError = client.submit(commitCommand).getOrThrow() + assertNull(commitError) + + val expiredTimeWindow = TimeWindow.untilOnly(Instant.MIN) + val commitCommand2 = RaftTransactionCommitLog.Commands.CommitTransaction( + states, txId, requestingPartyName.toString(), requestSignature, expiredTimeWindow + ) + val commitError2 = client.submit(commitCommand2).getOrThrow() + assertNull(commitError2) } private fun setUpCluster(nodeCount: Int = 3): List { diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index 11f5f83875..a5799598fa 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -35,11 +35,13 @@ import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity +import net.corda.testing.node.TestClock import net.corda.testing.node.internal.* import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test +import java.time.Duration import java.time.Instant import java.util.* import kotlin.test.assertEquals @@ -136,6 +138,30 @@ class ValidatingNotaryServiceTests { signatures.forEach { it.verify(stx.id) } } + @Test + fun `should re-sign a transaction with an expired time-window`() { + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + .setTimeWindow(Instant.now(), 30.seconds) + aliceNode.services.signInitialTransaction(tx) + } + + val sig1 = runNotaryClient(stx).getOrThrow().single() + assertEquals(sig1.by, notary.owningKey) + assertTrue(sig1.isValid(stx.id)) + + mockNet.nodes.forEach { + val nodeClock = (it.started!!.services.clock as TestClock) + nodeClock.advanceBy(Duration.ofDays(1)) + } + + val sig2 = runNotaryClient(stx).getOrThrow().single() + assertEquals(sig2.by, notary.owningKey) + } + @Test fun `should report error for transaction with an invalid time-window`() { val stx = run { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index cccfd447b1..77b0c36207 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -58,7 +58,7 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressSettings -import org.apache.activemq.artemis.spi.core.remoting.Connection +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import java.lang.reflect.Method import java.nio.file.Path @@ -155,11 +155,11 @@ fun rpcDriver( private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 { override fun validateUser(user: String?, password: String?) = isValid(user, password) override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?) = isValid(user, password) - override fun validateUser(user: String?, password: String?, connection: Connection?): String? { + override fun validateUser(user: String?, password: String?, connection: RemotingConnection?): String? { return validate(user, password) } - override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?, address: String?, connection: Connection?): String? { + override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet?, checkType: CheckType?, address: String?, connection: RemotingConnection?): String? { return validate(user, password) }