diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index c2bca61206..bef1d627cf 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -141,19 +141,21 @@ class NotaryFlow { */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic() { + companion object { + // TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder. + private const val maxAllowedInputs = 10_000 + } @Suspendable override fun call(): Void? { check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) { "We are not a notary on the network" } - val requestPayload = otherSideSession.receive().unwrap { it } var txId: SecureHash? = null try { val parts = validateRequest(requestPayload) txId = parts.id - checkNotary(parts.notary) service.validateTimeWindow(parts.timestamp) service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature) signTransactionAndSendResponse(txId) @@ -163,6 +165,16 @@ class NotaryFlow { return null } + /** Checks whether the number of input states is too large. */ + protected fun checkInputs(inputs: List) { + if (inputs.size > maxAllowedInputs) { + val error = NotaryError.TransactionInvalid( + IllegalArgumentException("A transaction cannot have more than $maxAllowedInputs inputs, received: ${inputs.size}") + ) + throw NotaryInternalException(error) + } + } + /** * Implement custom logic to perform transaction verification based on validity and privacy requirements. */ diff --git a/core/src/main/kotlin/net/corda/core/utilities/KotlinUtils.kt b/core/src/main/kotlin/net/corda/core/utilities/KotlinUtils.kt index fe4ec4362c..4f327a4a57 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/KotlinUtils.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/KotlinUtils.kt @@ -15,7 +15,6 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.CordaSerializable import org.slf4j.Logger import org.slf4j.LoggerFactory -import rx.Observable import java.time.Duration import java.util.concurrent.ExecutionException import java.util.concurrent.Future diff --git a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt index fa2e9c80e2..ce56bd2980 100644 --- a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt @@ -18,12 +18,10 @@ import net.corda.core.identity.Party import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.sequence -import net.corda.node.internal.StartedNode import net.corda.testing.contracts.DummyContract import net.corda.testing.core.singleIdentity -import net.corda.testing.node.internal.InternalMockNetwork -import net.corda.testing.node.internal.InternalMockNetwork.MockNode -import net.corda.testing.node.internal.startFlow +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.StartedMockNode import org.junit.After import org.junit.Before import org.junit.Test @@ -38,17 +36,17 @@ import kotlin.test.assertNull // DOCSTART 3 class ResolveTransactionsFlowTest { - private lateinit var mockNet: InternalMockNetwork - private lateinit var notaryNode: StartedNode - private lateinit var megaCorpNode: StartedNode - private lateinit var miniCorpNode: StartedNode + private lateinit var mockNet: MockNetwork + private lateinit var notaryNode: StartedMockNode + private lateinit var megaCorpNode: StartedMockNode + private lateinit var miniCorpNode: StartedMockNode private lateinit var megaCorp: Party private lateinit var miniCorp: Party private lateinit var notary: Party @Before fun setup() { - mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts", "net.corda.core.internal")) + mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts", "net.corda.core.internal")) notaryNode = mockNet.defaultNotaryNode megaCorpNode = mockNet.createPartyNode(CordaX500Name("MegaCorp", "London", "GB")) miniCorpNode = mockNet.createPartyNode(CordaX500Name("MiniCorp", "London", "GB")) @@ -68,11 +66,11 @@ class ResolveTransactionsFlowTest { fun `resolve from two hashes`() { val (stx1, stx2) = makeTransactions() val p = TestFlow(setOf(stx2.id), megaCorp) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - val results = future.resultFuture.getOrThrow() + val results = future.getOrThrow() assertEquals(listOf(stx1.id, stx2.id), results.map { it.id }) - miniCorpNode.database.transaction { + miniCorpNode.transaction { assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id)) assertEquals(stx2, miniCorpNode.services.validatedTransactions.getTransaction(stx2.id)) } @@ -83,19 +81,19 @@ class ResolveTransactionsFlowTest { fun `dependency with an error`() { val stx = makeTransactions(signFirstTX = false).second val p = TestFlow(setOf(stx.id), megaCorp) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - assertFailsWith(SignedTransaction.SignaturesMissingException::class) { future.resultFuture.getOrThrow() } + assertFailsWith(SignedTransaction.SignaturesMissingException::class) { future.getOrThrow() } } @Test fun `resolve from a signed transaction`() { val (stx1, stx2) = makeTransactions() val p = TestFlow(stx2, megaCorp) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - future.resultFuture.getOrThrow() - miniCorpNode.database.transaction { + future.getOrThrow() + miniCorpNode.transaction { assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id)) // But stx2 wasn't inserted, just stx1. assertNull(miniCorpNode.services.validatedTransactions.getTransaction(stx2.id)) @@ -111,15 +109,15 @@ class ResolveTransactionsFlowTest { repeat(count) { val builder = DummyContract.move(cursor.tx.outRef(0), miniCorp) val stx = megaCorpNode.services.signInitialTransaction(builder) - megaCorpNode.database.transaction { + megaCorpNode.transaction { megaCorpNode.services.recordTransactions(stx) } cursor = stx } val p = TestFlow(setOf(cursor.id), megaCorp, 40) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - assertFailsWith { future.resultFuture.getOrThrow() } + assertFailsWith { future.getOrThrow() } } @Test @@ -136,14 +134,14 @@ class ResolveTransactionsFlowTest { notaryNode.services.addSignature(ptx, notary.owningKey) } - megaCorpNode.database.transaction { + megaCorpNode.transaction { megaCorpNode.services.recordTransactions(stx2, stx3) } val p = TestFlow(setOf(stx3.id), megaCorp) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - future.resultFuture.getOrThrow() + future.getOrThrow() } @Test @@ -158,17 +156,17 @@ class ResolveTransactionsFlowTest { return bs.toByteArray().sequence().open() } // TODO: this operation should not require an explicit transaction - val id = megaCorpNode.database.transaction { + val id = megaCorpNode.transaction { megaCorpNode.services.attachments.importAttachment(makeJar()) } val stx2 = makeTransactions(withAttachment = id).second val p = TestFlow(stx2, megaCorp) - val future = miniCorpNode.services.startFlow(p) + val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - future.resultFuture.getOrThrow() + future.getOrThrow() // TODO: this operation should not require an explicit transaction - miniCorpNode.database.transaction { + miniCorpNode.transaction { assertNotNull(miniCorpNode.services.attachments.openAttachment(id)) } } @@ -193,7 +191,7 @@ class ResolveTransactionsFlowTest { val ptx = megaCorpNode.services.signInitialTransaction(it) notaryNode.services.addSignature(ptx, notary.owningKey) } - megaCorpNode.database.transaction { + megaCorpNode.transaction { megaCorpNode.services.recordTransactions(dummy1, dummy2) } return Pair(dummy1, dummy2) diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index afc8c3c700..1d57a53f89 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -95,6 +95,13 @@ absolute path to the node's base directory. here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, the node will try to auto-discover its public one. +:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is + resent to a different notary-replica round-robin in case of clustered notaries. + + :messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`. + :maxRetryCount: How many retries to attempt. + :backoffBase: The base of the exponential backoff, :math:`t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`. + :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block. :rpcSettings: Options for the RPC server. diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/Utilities.kt b/experimental/behave/src/main/kotlin/net/corda/behave/Utilities.kt index 1f689d5a96..89d8c389c3 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/Utilities.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/Utilities.kt @@ -13,26 +13,6 @@ package net.corda.behave import java.time.Duration import java.util.concurrent.CountDownLatch -// TODO Most of these are available in corda core - -val Int.millisecond: Duration - get() = Duration.ofMillis(this.toLong()) - -val Int.milliseconds: Duration - get() = Duration.ofMillis(this.toLong()) - -val Int.second: Duration - get() = Duration.ofSeconds(this.toLong()) - -val Int.seconds: Duration - get() = Duration.ofSeconds(this.toLong()) - -val Int.minute: Duration - get() = Duration.ofMinutes(this.toLong()) - -val Int.minutes: Duration - get() = Duration.ofMinutes(this.toLong()) - fun CountDownLatch.await(duration: Duration) = this.await(duration.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS) diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/monitoring/Watch.kt b/experimental/behave/src/main/kotlin/net/corda/behave/monitoring/Watch.kt index 48d7640633..aaed374eac 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/monitoring/Watch.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/monitoring/Watch.kt @@ -11,7 +11,7 @@ package net.corda.behave.monitoring import net.corda.behave.await -import net.corda.behave.seconds +import net.corda.core.utilities.seconds import rx.Observable import java.time.Duration import java.util.concurrent.CountDownLatch diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/network/Network.kt b/experimental/behave/src/main/kotlin/net/corda/behave/network/Network.kt index 7be4843759..0309449e67 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/network/Network.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/network/Network.kt @@ -14,14 +14,14 @@ import net.corda.behave.database.DatabaseType import net.corda.behave.file.LogSource import net.corda.behave.file.currentDirectory import net.corda.behave.file.stagingRoot -import net.corda.behave.logging.getLogger -import net.corda.behave.minutes import net.corda.behave.node.Distribution import net.corda.behave.node.Node import net.corda.behave.node.configuration.NotaryType import net.corda.behave.process.JarCommand import net.corda.core.CordaException import net.corda.core.internal.* +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.minutes import org.apache.commons.io.FileUtils import java.io.Closeable import java.nio.file.Path @@ -295,7 +295,7 @@ class Network private constructor( } companion object { - val log = getLogger() + val log = contextLogger() const val CLEANUP_ON_ERROR = false fun new(timeout: Duration = 2.minutes diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Distribution.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Distribution.kt index 88e72478e0..8a36b35226 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Distribution.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Distribution.kt @@ -11,12 +11,11 @@ package net.corda.behave.node import net.corda.behave.file.stagingRoot -import net.corda.behave.logging.getLogger -import net.corda.behave.service.Service import net.corda.core.internal.copyTo import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.internal.exists +import net.corda.core.utilities.contextLogger import java.net.URL import java.nio.file.Path @@ -92,7 +91,7 @@ class Distribution private constructor( companion object { - protected val log = getLogger() + private val log = contextLogger() private val distributions = mutableListOf() diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt index 2a3f8b4bc6..9d5221fcd7 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt @@ -15,11 +15,9 @@ import net.corda.behave.database.DatabaseType import net.corda.behave.file.LogSource import net.corda.behave.file.currentDirectory import net.corda.behave.file.stagingRoot -import net.corda.behave.logging.getLogger import net.corda.behave.monitoring.PatternWatch import net.corda.behave.node.configuration.* import net.corda.behave.process.JarCommand -import net.corda.behave.seconds import net.corda.behave.service.Service import net.corda.behave.service.ServiceSettings import net.corda.behave.ssh.MonitoringSSHClient @@ -30,6 +28,8 @@ import net.corda.core.internal.div import net.corda.core.internal.exists import net.corda.core.messaging.CordaRPCOps import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.seconds import org.apache.commons.io.FileUtils import java.net.InetAddress import java.nio.file.Path @@ -45,7 +45,7 @@ class Node( private val settings: ServiceSettings = ServiceSettings() ) { - private val log = getLogger() + private val log = loggerFor() private val runtimeDirectory = rootDirectory / config.name diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/configuration/Configuration.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/configuration/Configuration.kt index d0f37699c8..92816d7a02 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/configuration/Configuration.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/configuration/Configuration.kt @@ -11,10 +11,10 @@ package net.corda.behave.node.configuration import net.corda.behave.database.DatabaseType -import net.corda.behave.logging.getLogger import net.corda.behave.node.Distribution import net.corda.core.identity.CordaX500Name import net.corda.core.internal.writeText +import net.corda.core.utilities.contextLogger import java.nio.file.Path class Configuration( @@ -63,7 +63,7 @@ class Configuration( .joinToString("\n") companion object { - private val log = getLogger() + private val log = contextLogger() const val DEFAULT_PASSWORD = "S0meS3cretW0rd" } diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt b/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt index b5af2a2796..c0a2d4703c 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt @@ -10,10 +10,13 @@ package net.corda.behave.process -import net.corda.behave.* +import net.corda.behave.await import net.corda.behave.file.currentDirectory -import net.corda.behave.logging.getLogger import net.corda.behave.process.output.OutputListener +import net.corda.behave.waitFor +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import rx.Observable import rx.Subscriber import java.io.Closeable @@ -28,7 +31,7 @@ open class Command( private val timeout: Duration = 2.minutes ): Closeable { - protected val log = getLogger() + protected val log = loggerFor() private val terminationLatch = CountDownLatch(1) @@ -84,7 +87,7 @@ open class Command( }).start() val streamIsClosed = outputCapturedLatch.await(timeout) val timeout = if (!streamIsClosed || isInterrupted) { - 1.second + 1.seconds } else { timeout } diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/service/Service.kt b/experimental/behave/src/main/kotlin/net/corda/behave/service/Service.kt index 4440b99277..721018c3a0 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/service/Service.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/service/Service.kt @@ -10,7 +10,7 @@ package net.corda.behave.service -import net.corda.behave.logging.getLogger +import net.corda.core.utilities.loggerFor import java.io.Closeable abstract class Service( @@ -21,7 +21,7 @@ abstract class Service( private var isRunning: Boolean = false - protected val log = getLogger() + protected val log = loggerFor() fun start(): Boolean { if (isRunning) { diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/service/ServiceSettings.kt b/experimental/behave/src/main/kotlin/net/corda/behave/service/ServiceSettings.kt index 5928c6a81e..1e7e8d0419 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/service/ServiceSettings.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/service/ServiceSettings.kt @@ -10,14 +10,13 @@ package net.corda.behave.service -import net.corda.behave.minute -import net.corda.behave.second -import net.corda.behave.seconds +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import java.time.Duration data class ServiceSettings( - val timeout: Duration = 1.minute, - val startupDelay: Duration = 1.second, + val timeout: Duration = 1.minutes, + val startupDelay: Duration = 1.seconds, val startupTimeout: Duration = 15.seconds, - val pollInterval: Duration = 1.second + val pollInterval: Duration = 1.seconds ) diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/ssh/SSHClient.kt b/experimental/behave/src/main/kotlin/net/corda/behave/ssh/SSHClient.kt index 8cfd0e4d06..18706425ff 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/ssh/SSHClient.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/ssh/SSHClient.kt @@ -10,7 +10,7 @@ package net.corda.behave.ssh -import net.corda.behave.logging.getLogger +import net.corda.core.utilities.contextLogger import org.apache.sshd.client.SshClient import org.apache.sshd.client.channel.ChannelShell import org.apache.sshd.client.session.ClientSession @@ -106,7 +106,7 @@ open class SSHClient private constructor( companion object { - private val log = getLogger() + private val log = contextLogger() fun connect( port: Int, diff --git a/experimental/behave/src/test/kotlin/net/corda/behave/monitoring/MonitoringTests.kt b/experimental/behave/src/test/kotlin/net/corda/behave/monitoring/MonitoringTests.kt index 8cd4308f25..3fbee8e96a 100644 --- a/experimental/behave/src/test/kotlin/net/corda/behave/monitoring/MonitoringTests.kt +++ b/experimental/behave/src/test/kotlin/net/corda/behave/monitoring/MonitoringTests.kt @@ -10,7 +10,7 @@ package net.corda.behave.monitoring -import net.corda.behave.second +import net.corda.core.utilities.seconds import org.assertj.core.api.Assertions.assertThat import org.junit.Test import rx.Observable @@ -20,14 +20,14 @@ class MonitoringTests { @Test fun `watch gets triggered when pattern is observed`() { val observable = Observable.just("first", "second", "third") - val result = PatternWatch(observable, "c.n").await(1.second) + val result = PatternWatch(observable, "c.n").await(1.seconds) assertThat(result).isTrue() } @Test fun `watch does not get triggered when pattern is not observed`() { val observable = Observable.just("first", "second", "third") - val result = PatternWatch(observable, "forth").await(1.second) + val result = PatternWatch(observable, "forth").await(1.seconds) assertThat(result).isFalse() } @@ -38,7 +38,7 @@ class MonitoringTests { val watch2 = PatternWatch(observable, "ond") val watch3 = PatternWatch(observable, "ird") val aggregate = watch1 * watch2 * watch3 - assertThat(aggregate.await(1.second)).isTrue() + assertThat(aggregate.await(1.seconds)).isTrue() } @Test @@ -48,7 +48,7 @@ class MonitoringTests { val watch2 = PatternWatch(observable, "ond") val watch3 = PatternWatch(observable, "baz") val aggregate = watch1 * watch2 * watch3 - assertThat(aggregate.await(1.second)).isFalse() + assertThat(aggregate.await(1.seconds)).isFalse() } @Test @@ -58,7 +58,7 @@ class MonitoringTests { val watch2 = PatternWatch(observable, "ond") val watch3 = PatternWatch(observable, "bar") val aggregate = watch1 / watch2 / watch3 - assertThat(aggregate.await(1.second)).isTrue() + assertThat(aggregate.await(1.seconds)).isTrue() } @Test @@ -68,7 +68,7 @@ class MonitoringTests { val watch2 = PatternWatch(observable, "baz") val watch3 = PatternWatch(observable, "bar") val aggregate = watch1 / watch2 / watch3 - assertThat(aggregate.await(1.second)).isFalse() + assertThat(aggregate.await(1.seconds)).isFalse() } } \ No newline at end of file diff --git a/experimental/behave/src/test/kotlin/net/corda/behave/network/NetworkTests.kt b/experimental/behave/src/test/kotlin/net/corda/behave/network/NetworkTests.kt index 2064aa301c..38497cc0f5 100644 --- a/experimental/behave/src/test/kotlin/net/corda/behave/network/NetworkTests.kt +++ b/experimental/behave/src/test/kotlin/net/corda/behave/network/NetworkTests.kt @@ -12,7 +12,7 @@ package net.corda.behave.network import net.corda.behave.database.DatabaseType import net.corda.behave.node.configuration.NotaryType -import net.corda.behave.seconds +import net.corda.core.utilities.seconds import org.junit.Ignore import org.junit.Test diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt index 09dabfcda4..36f2c0d96b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt @@ -26,6 +26,7 @@ import java.net.Proxy import java.net.URL import java.nio.file.Path import java.nio.file.Paths +import java.time.Duration import java.time.Instant import java.time.LocalDate import java.time.temporal.Temporal @@ -116,6 +117,7 @@ private fun Config.getSingleValue(path: String, type: KType, strict: Boolean = t Double::class -> getDouble(path) Boolean::class -> getBoolean(path) LocalDate::class -> LocalDate.parse(getString(path)) + Duration::class -> getDuration(path) Instant::class -> Instant.parse(getString(path)) NetworkHostAndPort::class -> NetworkHostAndPort.parse(getString(path)) Path::class -> Paths.get(getString(path)) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 877ef2471f..ad1176bd15 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -139,7 +139,8 @@ class P2PMessagingTest : IntegrationTest() { } private fun DriverDSL.startAlice(): InProcess { - return startNode(providedName = ALICE_NAME, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)) + return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf( + "messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3))) .map { (it as InProcess) } .getOrThrow() } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 49ef42a6c4..fa2efc5857 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -46,7 +46,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val compatibilityZoneURL: URL? val certificateChainCheckPolicies: List val verifierType: VerifierType - val messageRedeliveryDelaySeconds: Int + val p2pMessagingRetry: P2PMessagingRetryConfiguration val notary: NotaryConfig? val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort @@ -140,6 +140,18 @@ data class BFTSMaRtConfiguration( } } +/** + * Currently only used for notarisation requests. + * + * When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin + * in case of clustered notaries. + */ +data class P2PMessagingRetryConfiguration( + val messageRedeliveryDelay: Duration, + val maxRetryCount: Int, + val backoffBase: Double +) + fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs() data class NodeConfigurationImpl( @@ -155,9 +167,7 @@ data class NodeConfigurationImpl( override val rpcUsers: List, override val security: SecurityConfiguration? = null, override val verifierType: VerifierType, - // TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration. - // Then rename this to messageRedeliveryDelay and make it of type Duration - override val messageRedeliveryDelaySeconds: Int = 30, + override val p2pMessagingRetry: P2PMessagingRetryConfiguration, override val p2pAddress: NetworkHostAndPort, private val rpcAddress: NetworkHostAndPort? = null, private val rpcSettings: NodeRpcSettings, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index c7cc2ce71f..56127bbb85 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -115,7 +115,6 @@ class P2PMessagingClient(val config: NodeConfiguration, ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable { companion object { private val log = contextLogger() - private const val messageMaxRetryCount: Int = 3 fun createMessageToRedeliver(): PersistentMap, RetryMessage, Long> { return PersistentMap( @@ -143,6 +142,9 @@ class P2PMessagingClient(val config: NodeConfiguration, } } + private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount + private val backoffBase: Double = config.p2pMessagingRetry.backoffBase + private class InnerState { var started = false var running = false @@ -168,7 +170,7 @@ class P2PMessagingClient(val config: NodeConfiguration, data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) - private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() + private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds private val state = ThreadBox(InnerState()) private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) private val externalBridge: Boolean = config.enterpriseConfiguration.externalBridge ?: false @@ -551,7 +553,7 @@ class P2PMessagingClient(val config: NodeConfiguration, scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({ sendWithRetry(retryCount + 1, message, target, retryId) - }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) + }, messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS) } override fun cancelRedelivery(retryId: Long) { diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index d965e1821a..799f87d89f 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -138,9 +138,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, 0 } else { // TODO This needs special handling (node omitted update process or didn't accept new parameters) - logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + - "advertising: ${networkMap.networkParameterHash}.\n" + - "Node will shutdown now. Please update node to use correct network parameters file.") + logger.error( + """Node is using network parameters with hash $currentParametersHash but the network map is advertising ${networkMap.networkParameterHash}. +To resolve this mismatch, and move to the current parameters, delete the $NETWORK_PARAMS_FILE_NAME file from the node's directory and restart. +The node will shutdown now.""") 1 } exitProcess(exitCode) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index 5cf926709f..e1c8fb7f23 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -32,9 +32,12 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut @Suspendable override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { val transaction = requestPayload.coreTransaction + checkInputs(transaction.inputs) val request = NotarisationRequest(transaction.inputs, transaction.id) validateRequestSignature(request, requestPayload.requestSignature) - return extractParts(transaction) + val parts = extractParts(transaction) + checkNotary(parts.notary) + return parts } private fun extractParts(tx: CoreTransaction): TransactionParts { @@ -45,8 +48,7 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut checkAllComponentsVisible(ComponentGroupEnum.INPUTS_GROUP) checkAllComponentsVisible(ComponentGroupEnum.TIMEWINDOW_GROUP) } - val notary = tx.notary - TransactionParts(tx.id, tx.inputs, tx.timeWindow, notary) + TransactionParts(tx.id, tx.inputs, tx.timeWindow, tx.notary) } is ContractUpgradeFilteredTransaction -> TransactionParts(tx.id, tx.inputs, null, tx.notary) is NotaryChangeWireTransaction -> TransactionParts(tx.id, tx.inputs, null, tx.notary) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index 1a0727cbda..45e75e315f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -37,6 +37,7 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { try { val stx = requestPayload.signedTransaction + checkInputs(stx.inputs) validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature) val notary = stx.notary checkNotary(notary) diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 4acee29bd0..0002cc528e 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -49,3 +49,8 @@ rpcSettings = { useSsl = false standAloneBroker = false } +p2pMessagingRetry { + messageRedeliveryDelay = 30 seconds + maxRetryCount = 3 + backoffBase = 2.0 +} diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 9bd2be6ab0..75c5aafa62 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -14,6 +14,7 @@ import com.zaxxer.hikari.HikariConfig import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag +import net.corda.core.utilities.seconds import net.corda.testing.core.ALICE_NAME import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.tools.shell.SSHDConfiguration @@ -114,6 +115,7 @@ class NodeConfigurationImplTest { verifierType = VerifierType.InMemory, p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, + p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0), notary = null, certificateChainCheckPolicies = emptyList(), devMode = true, diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 8b2a56dc5e..3634979edf 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -15,8 +15,13 @@ import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.generateKeyPair import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds import net.corda.node.internal.configureDatabase import net.corda.node.services.config.* +import net.corda.node.services.config.CertChainPolicyConfig +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.P2PMessagingRetryConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.PersistentUniquenessProvider @@ -80,8 +85,8 @@ class ArtemisMessagingTest { doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration + doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase=1.0)).whenever(it).p2pMessagingRetry } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock()) diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt index e76d3b4c9d..425bbc8836 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt @@ -10,39 +10,28 @@ package net.corda.node.services.transactions -import net.corda.core.concurrent.CordaFuture -import net.corda.core.contracts.StateAndRef +import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.StateRef import net.corda.core.crypto.* -import net.corda.core.flows.* +import net.corda.core.flows.NotaryException +import net.corda.core.flows.NotaryFlow import net.corda.core.identity.Party -import net.corda.core.internal.generateSignature -import net.corda.core.messaging.MessageRecipients +import net.corda.core.internal.NotaryChangeTransactionBuilder import net.corda.core.node.ServiceHub -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode -import net.corda.node.services.messaging.Message -import net.corda.node.services.statemachine.InitialSessionMessage -import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.dummyCommand +import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.singleIdentity -import net.corda.testing.node.internal.* -import org.assertj.core.api.Assertions.assertThat +import net.corda.testing.node.MockNetworkNotarySpec +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.startFlow import org.junit.After import org.junit.Before import org.junit.Test -import java.time.Instant -import java.util.* -import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertTrue class NotaryServiceTests { private lateinit var mockNet: InternalMockNetwork @@ -53,7 +42,10 @@ class NotaryServiceTests { @Before fun setup() { - mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts")) + mockNet = InternalMockNetwork( + cordappPackages = listOf("net.corda.testing.contracts"), + notarySpecs = listOf(MockNetworkNotarySpec(DUMMY_NOTARY_NAME, validating = false)) + ) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) notaryServices = mockNet.defaultNotaryNode.services //TODO get rid of that notary = mockNet.defaultNotaryIdentity @@ -66,201 +58,38 @@ class NotaryServiceTests { } @Test - fun `should sign a unique transaction with a valid time-window`() { - val stx = run { - val inputState = issueState(aliceNode.services, alice) - val tx = TransactionBuilder(notary) - .addInputState(inputState) - .addCommand(dummyCommand(alice.owningKey)) - .setTimeWindow(Instant.now(), 30.seconds) - aliceNode.services.signInitialTransaction(tx) - } - - val future = runNotaryClient(stx) - val signatures = future.getOrThrow() - signatures.forEach { it.verify(stx.id) } + fun `should reject a transaction with too many inputs`() { + notariseWithTooManyInputs(aliceNode, alice, notary, mockNet) } - @Test - fun `should sign a unique transaction without a time-window`() { - val stx = run { - val inputState = issueState(aliceNode.services, alice) - val tx = TransactionBuilder(notary) - .addInputState(inputState) - .addCommand(dummyCommand(alice.owningKey)) - aliceNode.services.signInitialTransaction(tx) + internal companion object { + /** This is used by both [NotaryServiceTests] and [ValidatingNotaryServiceTests]. */ + fun notariseWithTooManyInputs(node: StartedNode, party: Party, notary: Party, network: InternalMockNetwork) { + val stx = generateTransaction(node, party, notary) + + val future = node.services.startFlow(DummyClientFlow(stx, notary)).resultFuture + network.runNetwork() + assertFailsWith { future.getOrThrow() } } - val future = runNotaryClient(stx) - val signatures = future.getOrThrow() - signatures.forEach { it.verify(stx.id) } - } + private fun generateTransaction(node: StartedNode, party: Party, notary: Party): SignedTransaction { + val inputs = (1..10_005).map { StateRef(SecureHash.randomSHA256(), 0) } + val tx = NotaryChangeTransactionBuilder(inputs, notary, party).build() - @Test - fun `should report error for transaction with an invalid time-window`() { - val stx = run { - val inputState = issueState(aliceNode.services, alice) - val tx = TransactionBuilder(notary) - .addInputState(inputState) - .addCommand(dummyCommand(alice.owningKey)) - .setTimeWindow(Instant.now().plusSeconds(3600), 30.seconds) - aliceNode.services.signInitialTransaction(tx) - } - - val future = runNotaryClient(stx) - - val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } - assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) - } - - @Test - fun `should sign identical transaction multiple times (notarisation is idempotent)`() { - val stx = run { - val inputState = issueState(aliceNode.services, alice) - val tx = TransactionBuilder(notary) - .addInputState(inputState) - .addCommand(dummyCommand(alice.owningKey)) - aliceNode.services.signInitialTransaction(tx) - } - - val firstAttempt = NotaryFlow.Client(stx) - val secondAttempt = NotaryFlow.Client(stx) - val f1 = aliceNode.services.startFlow(firstAttempt).resultFuture - val f2 = aliceNode.services.startFlow(secondAttempt).resultFuture - - mockNet.runNetwork() - - // Note that the notary will only return identical signatures when using deterministic signature - // schemes (e.g. EdDSA) and when deterministic metadata is attached (no timestamps or nonces). - // We only really care that both signatures are over the same transaction and by the same notary. - val sig1 = f1.getOrThrow().single() - assertEquals(sig1.by, notary.owningKey) - assertTrue(sig1.isValid(stx.id)) - - val sig2 = f2.getOrThrow().single() - assertEquals(sig2.by, notary.owningKey) - assertTrue(sig2.isValid(stx.id)) - } - - @Test - fun `should report conflict when inputs are reused across transactions`() { - val firstState = issueState(aliceNode.services, alice) - val secondState = issueState(aliceNode.services, alice) - - fun spendState(state: StateAndRef<*>): SignedTransaction { - val stx = run { - val tx = TransactionBuilder(notary) - .addInputState(state) - .addCommand(dummyCommand(alice.owningKey)) - aliceNode.services.signInitialTransaction(tx) + return node.services.run { + val myKey = myInfo.legalIdentities.first().owningKey + val signableData = SignableData(tx.id, SignatureMetadata(myInfo.platformVersion, Crypto.findSignatureScheme(myKey).schemeNumberID)) + val mySignature = keyManagementService.sign(signableData, myKey) + SignedTransaction(tx, listOf(mySignature)) } - aliceNode.services.startFlow(NotaryFlow.Client(stx)) - mockNet.runNetwork() - return stx } - val firstSpendTx = spendState(firstState) - val secondSpendTx = spendState(secondState) - - val doubleSpendTx = run { - val tx = TransactionBuilder(notary) - .addInputState(issueState(aliceNode.services, alice)) - .addInputState(firstState) - .addInputState(secondState) - .addCommand(dummyCommand(alice.owningKey)) - aliceNode.services.signInitialTransaction(tx) - } - - val doubleSpend = NotaryFlow.Client(doubleSpendTx) // Double spend the inputState in a second transaction. - val future = aliceNode.services.startFlow(doubleSpend) - mockNet.runNetwork() - - val ex = assertFailsWith(NotaryException::class) { future.resultFuture.getOrThrow() } - val notaryError = ex.error as NotaryError.Conflict - assertEquals(notaryError.txId, doubleSpendTx.id) - with(notaryError) { - assertEquals(consumedStates.size, 2) - assertEquals(consumedStates[firstState.ref]!!.hashOfTransactionId, firstSpendTx.id.sha256()) - assertEquals(consumedStates[secondState.ref]!!.hashOfTransactionId, secondSpendTx.id.sha256()) - } - } - - @Test - fun `should reject when notarisation request not signed by the requesting party`() { - runNotarisationAndInterceptClientPayload { originalPayload -> - val transaction = originalPayload.signedTransaction - val randomKeyPair = Crypto.generateKeyPair() - val bytesToSign = NotarisationRequest(transaction.inputs, transaction.id).serialize().bytes - val modifiedSignature = NotarisationRequestSignature(randomKeyPair.sign(bytesToSign), aliceNode.services.myInfo.platformVersion) - originalPayload.copy(requestSignature = modifiedSignature) - } - } - - @Test - fun `should reject when incorrect notarisation request signed - inputs don't match`() { - runNotarisationAndInterceptClientPayload { originalPayload -> - val transaction = originalPayload.signedTransaction - val wrongInputs = listOf(StateRef(SecureHash.randomSHA256(), 0)) - val request = NotarisationRequest(wrongInputs, transaction.id) - val modifiedSignature = request.generateSignature(aliceNode.services) - originalPayload.copy(requestSignature = modifiedSignature) - } - } - - @Test - fun `should reject when incorrect notarisation request signed - transaction id doesn't match`() { - runNotarisationAndInterceptClientPayload { originalPayload -> - val transaction = originalPayload.signedTransaction - val wrongTransactionId = SecureHash.randomSHA256() - val request = NotarisationRequest(transaction.inputs, wrongTransactionId) - val modifiedSignature = request.generateSignature(aliceNode.services) - originalPayload.copy(requestSignature = modifiedSignature) - } - } - - private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) { - aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) { - override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { - val messageData = message.data.deserialize() as? InitialSessionMessage - val payload = messageData?.firstPayload!!.deserialize() - - if (payload is NotarisationPayload) { - val alteredPayload = payloadModifier(payload) - val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize()) - val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId) - messagingService.send(alteredMessage, target, retryId) - - } else { - messagingService.send(message, target, retryId) - } + private class DummyClientFlow(stx: SignedTransaction, val notary: Party) : NotaryFlow.Client(stx) { + @Suspendable + override fun call(): List { + notarise(notary) + throw UnsupportedOperationException() } - }) - - val stx = run { - val inputState = issueState(aliceNode.services, alice) - val tx = TransactionBuilder(notary) - .addInputState(inputState) - .addCommand(dummyCommand(alice.owningKey)) - aliceNode.services.signInitialTransaction(tx) } - - val future = runNotaryClient(stx) - val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } - assertThat(ex.error).isInstanceOf(NotaryError.RequestSignatureInvalid::class.java) - } - - private fun runNotaryClient(stx: SignedTransaction): CordaFuture> { - val flow = NotaryFlow.Client(stx) - val future = aliceNode.services.startFlow(flow).resultFuture - mockNet.runNetwork() - return future - } - - private fun issueState(services: ServiceHub, identity: Party): StateAndRef<*> { - val tx = DummyContract.generateInitial(Random().nextInt(), notary, identity.ref(0)) - val signedByNode = services.signInitialTransaction(tx) - val stx = notaryServices.addSignature(signedByNode, notary.owningKey) - services.recordTransactions(stx) - return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0)) } } diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index 4dfcc2192a..11f5f83875 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -14,43 +14,49 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.Command import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef -import net.corda.core.crypto.TransactionSignature -import net.corda.core.crypto.generateKeyPair -import net.corda.core.flows.NotaryError -import net.corda.core.flows.NotaryException -import net.corda.core.flows.NotaryFlow +import net.corda.core.crypto.* +import net.corda.core.flows.* import net.corda.core.identity.Party +import net.corda.core.internal.generateSignature +import net.corda.core.messaging.MessageRecipients import net.corda.core.node.ServiceHub +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.node.internal.StartedNode import net.corda.node.services.issueInvalidState +import net.corda.node.services.messaging.Message +import net.corda.node.services.statemachine.InitialSessionMessage import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity -import net.corda.testing.node.MockNetwork -import net.corda.testing.node.MockNodeParameters -import net.corda.testing.node.StartedMockNode +import net.corda.testing.node.internal.* import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test +import java.time.Instant import java.util.* import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertTrue class ValidatingNotaryServiceTests { - private lateinit var mockNet: MockNetwork - private lateinit var notaryNode: StartedMockNode - private lateinit var aliceNode: StartedMockNode + private lateinit var mockNet: InternalMockNetwork + private lateinit var notaryNode: StartedNode + private lateinit var aliceNode: StartedNode private lateinit var notary: Party private lateinit var alice: Party @Before fun setup() { - mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts")) - aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME)) + mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts")) + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) notaryNode = mockNet.defaultNotaryNode notary = mockNet.defaultNotaryIdentity alice = aliceNode.info.singleIdentity() @@ -71,7 +77,7 @@ class ValidatingNotaryServiceTests { aliceNode.services.signInitialTransaction(tx) } - val future = runClient(stx) + val future = runNotaryClient(stx) val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } val notaryError = ex.error as NotaryError.TransactionInvalid @@ -92,16 +98,205 @@ class ValidatingNotaryServiceTests { // Expecting SignaturesMissingException instead of NotaryException, since the exception should originate from // the client flow. val ex = assertFailsWith { - val future = runClient(stx) + val future = runNotaryClient(stx) future.getOrThrow() } val missingKeys = ex.missing assertEquals(setOf(expectedMissingKey), missingKeys) } - private fun runClient(stx: SignedTransaction): CordaFuture> { + @Test + fun `should sign a unique transaction with a valid time-window`() { + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + .setTimeWindow(Instant.now(), 30.seconds) + aliceNode.services.signInitialTransaction(tx) + } + + val future = runNotaryClient(stx) + val signatures = future.getOrThrow() + signatures.forEach { it.verify(stx.id) } + } + + @Test + fun `should sign a unique transaction without a time-window`() { + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + aliceNode.services.signInitialTransaction(tx) + } + + val future = runNotaryClient(stx) + val signatures = future.getOrThrow() + signatures.forEach { it.verify(stx.id) } + } + + @Test + fun `should report error for transaction with an invalid time-window`() { + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + .setTimeWindow(Instant.now().plusSeconds(3600), 30.seconds) + aliceNode.services.signInitialTransaction(tx) + } + + val future = runNotaryClient(stx) + + val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } + assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) + } + + @Test + fun `should sign identical transaction multiple times (notarisation is idempotent)`() { + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + aliceNode.services.signInitialTransaction(tx) + } + + val firstAttempt = NotaryFlow.Client(stx) + val secondAttempt = NotaryFlow.Client(stx) + val f1 = aliceNode.services.startFlow(firstAttempt).resultFuture + val f2 = aliceNode.services.startFlow(secondAttempt).resultFuture + + mockNet.runNetwork() + + // Note that the notary will only return identical signatures when using deterministic signature + // schemes (e.g. EdDSA) and when deterministic metadata is attached (no timestamps or nonces). + // We only really care that both signatures are over the same transaction and by the same notary. + val sig1 = f1.getOrThrow().single() + assertEquals(sig1.by, notary.owningKey) + assertTrue(sig1.isValid(stx.id)) + + val sig2 = f2.getOrThrow().single() + assertEquals(sig2.by, notary.owningKey) + assertTrue(sig2.isValid(stx.id)) + } + + @Test + fun `should report conflict when inputs are reused across transactions`() { + val firstState = issueState(aliceNode.services, alice) + val secondState = issueState(aliceNode.services, alice) + + fun spendState(state: StateAndRef<*>): SignedTransaction { + val stx = run { + val tx = TransactionBuilder(notary) + .addInputState(state) + .addCommand(dummyCommand(alice.owningKey)) + aliceNode.services.signInitialTransaction(tx) + } + aliceNode.services.startFlow(NotaryFlow.Client(stx)) + mockNet.runNetwork() + return stx + } + + val firstSpendTx = spendState(firstState) + val secondSpendTx = spendState(secondState) + + val doubleSpendTx = run { + val tx = TransactionBuilder(notary) + .addInputState(issueState(aliceNode.services, alice)) + .addInputState(firstState) + .addInputState(secondState) + .addCommand(dummyCommand(alice.owningKey)) + aliceNode.services.signInitialTransaction(tx) + } + + val doubleSpend = NotaryFlow.Client(doubleSpendTx) // Double spend the inputState in a second transaction. + val future = aliceNode.services.startFlow(doubleSpend) + mockNet.runNetwork() + + val ex = assertFailsWith(NotaryException::class) { future.resultFuture.getOrThrow() } + val notaryError = ex.error as NotaryError.Conflict + assertEquals(notaryError.txId, doubleSpendTx.id) + with(notaryError) { + assertEquals(consumedStates.size, 2) + assertEquals(consumedStates[firstState.ref]!!.hashOfTransactionId, firstSpendTx.id.sha256()) + assertEquals(consumedStates[secondState.ref]!!.hashOfTransactionId, secondSpendTx.id.sha256()) + } + } + + @Test + fun `should reject when notarisation request not signed by the requesting party`() { + runNotarisationAndInterceptClientPayload { originalPayload -> + val transaction = originalPayload.signedTransaction + val randomKeyPair = Crypto.generateKeyPair() + val bytesToSign = NotarisationRequest(transaction.inputs, transaction.id).serialize().bytes + val modifiedSignature = NotarisationRequestSignature(randomKeyPair.sign(bytesToSign), aliceNode.services.myInfo.platformVersion) + originalPayload.copy(requestSignature = modifiedSignature) + } + } + + @Test + fun `should reject when incorrect notarisation request signed - inputs don't match`() { + runNotarisationAndInterceptClientPayload { originalPayload -> + val transaction = originalPayload.signedTransaction + val wrongInputs = listOf(StateRef(SecureHash.randomSHA256(), 0)) + val request = NotarisationRequest(wrongInputs, transaction.id) + val modifiedSignature = request.generateSignature(aliceNode.services) + originalPayload.copy(requestSignature = modifiedSignature) + } + } + + @Test + fun `should reject when incorrect notarisation request signed - transaction id doesn't match`() { + runNotarisationAndInterceptClientPayload { originalPayload -> + val transaction = originalPayload.signedTransaction + val wrongTransactionId = SecureHash.randomSHA256() + val request = NotarisationRequest(transaction.inputs, wrongTransactionId) + val modifiedSignature = request.generateSignature(aliceNode.services) + originalPayload.copy(requestSignature = modifiedSignature) + } + } + + @Test + fun `should reject a transaction with too many inputs`() { + NotaryServiceTests.notariseWithTooManyInputs(aliceNode, alice, notary, mockNet) + } + + private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) { + aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) { + override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { + val messageData = message.data.deserialize() as? InitialSessionMessage + val payload = messageData?.firstPayload!!.deserialize() + + if (payload is NotarisationPayload) { + val alteredPayload = payloadModifier(payload) + val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize()) + val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId) + messagingService.send(alteredMessage, target, retryId) + + } else { + messagingService.send(message, target, retryId) + } + } + }) + + val stx = run { + val inputState = issueState(aliceNode.services, alice) + val tx = TransactionBuilder(notary) + .addInputState(inputState) + .addCommand(dummyCommand(alice.owningKey)) + aliceNode.services.signInitialTransaction(tx) + } + + val future = runNotaryClient(stx) + val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } + assertThat(ex.error).isInstanceOf(NotaryError.RequestSignatureInvalid::class.java) + } + + private fun runNotaryClient(stx: SignedTransaction): CordaFuture> { val flow = NotaryFlow.Client(stx) - val future = aliceNode.startFlow(flow) + val future = aliceNode.services.startFlow(flow).resultFuture mockNet.runNetwork() return future } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 61c27a6d6e..437bf2927e 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -473,7 +473,7 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(null).whenever(it).compatibilityZoneURL doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(VerifierType.InMemory).whenever(it).verifierType - doReturn(5).whenever(it).messageRedeliveryDelaySeconds + doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions doReturn(EnterpriseConfiguration(