mirror of
https://github.com/corda/corda.git
synced 2025-02-04 18:22:29 +00:00
commit
01a07481ad
@ -141,19 +141,21 @@ class NotaryFlow {
|
||||
*/
|
||||
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
|
||||
abstract class Service(val otherSideSession: FlowSession, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
|
||||
companion object {
|
||||
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
|
||||
private const val maxAllowedInputs = 10_000
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Void? {
|
||||
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
|
||||
"We are not a notary on the network"
|
||||
}
|
||||
|
||||
val requestPayload = otherSideSession.receive<NotarisationPayload>().unwrap { it }
|
||||
var txId: SecureHash? = null
|
||||
try {
|
||||
val parts = validateRequest(requestPayload)
|
||||
txId = parts.id
|
||||
checkNotary(parts.notary)
|
||||
service.validateTimeWindow(parts.timestamp)
|
||||
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature)
|
||||
signTransactionAndSendResponse(txId)
|
||||
@ -163,6 +165,16 @@ class NotaryFlow {
|
||||
return null
|
||||
}
|
||||
|
||||
/** Checks whether the number of input states is too large. */
|
||||
protected fun checkInputs(inputs: List<StateRef>) {
|
||||
if (inputs.size > maxAllowedInputs) {
|
||||
val error = NotaryError.TransactionInvalid(
|
||||
IllegalArgumentException("A transaction cannot have more than $maxAllowedInputs inputs, received: ${inputs.size}")
|
||||
)
|
||||
throw NotaryInternalException(error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement custom logic to perform transaction verification based on validity and privacy requirements.
|
||||
*/
|
||||
|
@ -15,7 +15,6 @@ import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Future
|
||||
|
@ -18,12 +18,10 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.StartedMockNode
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -38,17 +36,17 @@ import kotlin.test.assertNull
|
||||
|
||||
// DOCSTART 3
|
||||
class ResolveTransactionsFlowTest {
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var notaryNode: StartedNode<MockNode>
|
||||
private lateinit var megaCorpNode: StartedNode<MockNode>
|
||||
private lateinit var miniCorpNode: StartedNode<MockNode>
|
||||
private lateinit var mockNet: MockNetwork
|
||||
private lateinit var notaryNode: StartedMockNode
|
||||
private lateinit var megaCorpNode: StartedMockNode
|
||||
private lateinit var miniCorpNode: StartedMockNode
|
||||
private lateinit var megaCorp: Party
|
||||
private lateinit var miniCorp: Party
|
||||
private lateinit var notary: Party
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts", "net.corda.core.internal"))
|
||||
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts", "net.corda.core.internal"))
|
||||
notaryNode = mockNet.defaultNotaryNode
|
||||
megaCorpNode = mockNet.createPartyNode(CordaX500Name("MegaCorp", "London", "GB"))
|
||||
miniCorpNode = mockNet.createPartyNode(CordaX500Name("MiniCorp", "London", "GB"))
|
||||
@ -68,11 +66,11 @@ class ResolveTransactionsFlowTest {
|
||||
fun `resolve from two hashes`() {
|
||||
val (stx1, stx2) = makeTransactions()
|
||||
val p = TestFlow(setOf(stx2.id), megaCorp)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
val results = future.resultFuture.getOrThrow()
|
||||
val results = future.getOrThrow()
|
||||
assertEquals(listOf(stx1.id, stx2.id), results.map { it.id })
|
||||
miniCorpNode.database.transaction {
|
||||
miniCorpNode.transaction {
|
||||
assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id))
|
||||
assertEquals(stx2, miniCorpNode.services.validatedTransactions.getTransaction(stx2.id))
|
||||
}
|
||||
@ -83,19 +81,19 @@ class ResolveTransactionsFlowTest {
|
||||
fun `dependency with an error`() {
|
||||
val stx = makeTransactions(signFirstTX = false).second
|
||||
val p = TestFlow(setOf(stx.id), megaCorp)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
assertFailsWith(SignedTransaction.SignaturesMissingException::class) { future.resultFuture.getOrThrow() }
|
||||
assertFailsWith(SignedTransaction.SignaturesMissingException::class) { future.getOrThrow() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `resolve from a signed transaction`() {
|
||||
val (stx1, stx2) = makeTransactions()
|
||||
val p = TestFlow(stx2, megaCorp)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
future.resultFuture.getOrThrow()
|
||||
miniCorpNode.database.transaction {
|
||||
future.getOrThrow()
|
||||
miniCorpNode.transaction {
|
||||
assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id))
|
||||
// But stx2 wasn't inserted, just stx1.
|
||||
assertNull(miniCorpNode.services.validatedTransactions.getTransaction(stx2.id))
|
||||
@ -111,15 +109,15 @@ class ResolveTransactionsFlowTest {
|
||||
repeat(count) {
|
||||
val builder = DummyContract.move(cursor.tx.outRef(0), miniCorp)
|
||||
val stx = megaCorpNode.services.signInitialTransaction(builder)
|
||||
megaCorpNode.database.transaction {
|
||||
megaCorpNode.transaction {
|
||||
megaCorpNode.services.recordTransactions(stx)
|
||||
}
|
||||
cursor = stx
|
||||
}
|
||||
val p = TestFlow(setOf(cursor.id), megaCorp, 40)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
assertFailsWith<ResolveTransactionsFlow.ExcessivelyLargeTransactionGraph> { future.resultFuture.getOrThrow() }
|
||||
assertFailsWith<ResolveTransactionsFlow.ExcessivelyLargeTransactionGraph> { future.getOrThrow() }
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -136,14 +134,14 @@ class ResolveTransactionsFlowTest {
|
||||
notaryNode.services.addSignature(ptx, notary.owningKey)
|
||||
}
|
||||
|
||||
megaCorpNode.database.transaction {
|
||||
megaCorpNode.transaction {
|
||||
megaCorpNode.services.recordTransactions(stx2, stx3)
|
||||
}
|
||||
|
||||
val p = TestFlow(setOf(stx3.id), megaCorp)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
future.resultFuture.getOrThrow()
|
||||
future.getOrThrow()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -158,17 +156,17 @@ class ResolveTransactionsFlowTest {
|
||||
return bs.toByteArray().sequence().open()
|
||||
}
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
val id = megaCorpNode.database.transaction {
|
||||
val id = megaCorpNode.transaction {
|
||||
megaCorpNode.services.attachments.importAttachment(makeJar())
|
||||
}
|
||||
val stx2 = makeTransactions(withAttachment = id).second
|
||||
val p = TestFlow(stx2, megaCorp)
|
||||
val future = miniCorpNode.services.startFlow(p)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
future.resultFuture.getOrThrow()
|
||||
future.getOrThrow()
|
||||
|
||||
// TODO: this operation should not require an explicit transaction
|
||||
miniCorpNode.database.transaction {
|
||||
miniCorpNode.transaction {
|
||||
assertNotNull(miniCorpNode.services.attachments.openAttachment(id))
|
||||
}
|
||||
}
|
||||
@ -193,7 +191,7 @@ class ResolveTransactionsFlowTest {
|
||||
val ptx = megaCorpNode.services.signInitialTransaction(it)
|
||||
notaryNode.services.addSignature(ptx, notary.owningKey)
|
||||
}
|
||||
megaCorpNode.database.transaction {
|
||||
megaCorpNode.transaction {
|
||||
megaCorpNode.services.recordTransactions(dummy1, dummy2)
|
||||
}
|
||||
return Pair(dummy1, dummy2)
|
||||
|
@ -95,6 +95,13 @@ absolute path to the node's base directory.
|
||||
here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
|
||||
the node will try to auto-discover its public one.
|
||||
|
||||
:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is
|
||||
resent to a different notary-replica round-robin in case of clustered notaries.
|
||||
|
||||
:messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`.
|
||||
:maxRetryCount: How many retries to attempt.
|
||||
:backoffBase: The base of the exponential backoff, :math:`t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`.
|
||||
|
||||
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.
|
||||
|
||||
:rpcSettings: Options for the RPC server.
|
||||
|
@ -13,26 +13,6 @@ package net.corda.behave
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
// TODO Most of these are available in corda core
|
||||
|
||||
val Int.millisecond: Duration
|
||||
get() = Duration.ofMillis(this.toLong())
|
||||
|
||||
val Int.milliseconds: Duration
|
||||
get() = Duration.ofMillis(this.toLong())
|
||||
|
||||
val Int.second: Duration
|
||||
get() = Duration.ofSeconds(this.toLong())
|
||||
|
||||
val Int.seconds: Duration
|
||||
get() = Duration.ofSeconds(this.toLong())
|
||||
|
||||
val Int.minute: Duration
|
||||
get() = Duration.ofMinutes(this.toLong())
|
||||
|
||||
val Int.minutes: Duration
|
||||
get() = Duration.ofMinutes(this.toLong())
|
||||
|
||||
fun CountDownLatch.await(duration: Duration) =
|
||||
this.await(duration.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS)
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
package net.corda.behave.monitoring
|
||||
|
||||
import net.corda.behave.await
|
||||
import net.corda.behave.seconds
|
||||
import net.corda.core.utilities.seconds
|
||||
import rx.Observable
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
@ -14,14 +14,14 @@ import net.corda.behave.database.DatabaseType
|
||||
import net.corda.behave.file.LogSource
|
||||
import net.corda.behave.file.currentDirectory
|
||||
import net.corda.behave.file.stagingRoot
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.minutes
|
||||
import net.corda.behave.node.Distribution
|
||||
import net.corda.behave.node.Node
|
||||
import net.corda.behave.node.configuration.NotaryType
|
||||
import net.corda.behave.process.JarCommand
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.minutes
|
||||
import org.apache.commons.io.FileUtils
|
||||
import java.io.Closeable
|
||||
import java.nio.file.Path
|
||||
@ -295,7 +295,7 @@ class Network private constructor(
|
||||
}
|
||||
|
||||
companion object {
|
||||
val log = getLogger<Network>()
|
||||
val log = contextLogger()
|
||||
const val CLEANUP_ON_ERROR = false
|
||||
|
||||
fun new(timeout: Duration = 2.minutes
|
||||
|
@ -11,12 +11,11 @@
|
||||
package net.corda.behave.node
|
||||
|
||||
import net.corda.behave.file.stagingRoot
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.service.Service
|
||||
import net.corda.core.internal.copyTo
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
|
||||
@ -92,7 +91,7 @@ class Distribution private constructor(
|
||||
|
||||
companion object {
|
||||
|
||||
protected val log = getLogger<Service>()
|
||||
private val log = contextLogger()
|
||||
|
||||
private val distributions = mutableListOf<Distribution>()
|
||||
|
||||
|
@ -15,11 +15,9 @@ import net.corda.behave.database.DatabaseType
|
||||
import net.corda.behave.file.LogSource
|
||||
import net.corda.behave.file.currentDirectory
|
||||
import net.corda.behave.file.stagingRoot
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.monitoring.PatternWatch
|
||||
import net.corda.behave.node.configuration.*
|
||||
import net.corda.behave.process.JarCommand
|
||||
import net.corda.behave.seconds
|
||||
import net.corda.behave.service.Service
|
||||
import net.corda.behave.service.ServiceSettings
|
||||
import net.corda.behave.ssh.MonitoringSSHClient
|
||||
@ -30,6 +28,8 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.seconds
|
||||
import org.apache.commons.io.FileUtils
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
@ -45,7 +45,7 @@ class Node(
|
||||
private val settings: ServiceSettings = ServiceSettings()
|
||||
) {
|
||||
|
||||
private val log = getLogger<Node>()
|
||||
private val log = loggerFor<Node>()
|
||||
|
||||
private val runtimeDirectory = rootDirectory / config.name
|
||||
|
||||
|
@ -11,10 +11,10 @@
|
||||
package net.corda.behave.node.configuration
|
||||
|
||||
import net.corda.behave.database.DatabaseType
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.node.Distribution
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.writeText
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import java.nio.file.Path
|
||||
|
||||
class Configuration(
|
||||
@ -63,7 +63,7 @@ class Configuration(
|
||||
.joinToString("\n")
|
||||
|
||||
companion object {
|
||||
private val log = getLogger<Configuration>()
|
||||
private val log = contextLogger()
|
||||
const val DEFAULT_PASSWORD = "S0meS3cretW0rd"
|
||||
}
|
||||
|
||||
|
@ -10,10 +10,13 @@
|
||||
|
||||
package net.corda.behave.process
|
||||
|
||||
import net.corda.behave.*
|
||||
import net.corda.behave.await
|
||||
import net.corda.behave.file.currentDirectory
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.behave.process.output.OutputListener
|
||||
import net.corda.behave.waitFor
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import java.io.Closeable
|
||||
@ -28,7 +31,7 @@ open class Command(
|
||||
private val timeout: Duration = 2.minutes
|
||||
): Closeable {
|
||||
|
||||
protected val log = getLogger<Command>()
|
||||
protected val log = loggerFor<Command>()
|
||||
|
||||
private val terminationLatch = CountDownLatch(1)
|
||||
|
||||
@ -84,7 +87,7 @@ open class Command(
|
||||
}).start()
|
||||
val streamIsClosed = outputCapturedLatch.await(timeout)
|
||||
val timeout = if (!streamIsClosed || isInterrupted) {
|
||||
1.second
|
||||
1.seconds
|
||||
} else {
|
||||
timeout
|
||||
}
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
package net.corda.behave.service
|
||||
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import java.io.Closeable
|
||||
|
||||
abstract class Service(
|
||||
@ -21,7 +21,7 @@ abstract class Service(
|
||||
|
||||
private var isRunning: Boolean = false
|
||||
|
||||
protected val log = getLogger<Service>()
|
||||
protected val log = loggerFor<Service>()
|
||||
|
||||
fun start(): Boolean {
|
||||
if (isRunning) {
|
||||
|
@ -10,14 +10,13 @@
|
||||
|
||||
package net.corda.behave.service
|
||||
|
||||
import net.corda.behave.minute
|
||||
import net.corda.behave.second
|
||||
import net.corda.behave.seconds
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import java.time.Duration
|
||||
|
||||
data class ServiceSettings(
|
||||
val timeout: Duration = 1.minute,
|
||||
val startupDelay: Duration = 1.second,
|
||||
val timeout: Duration = 1.minutes,
|
||||
val startupDelay: Duration = 1.seconds,
|
||||
val startupTimeout: Duration = 15.seconds,
|
||||
val pollInterval: Duration = 1.second
|
||||
val pollInterval: Duration = 1.seconds
|
||||
)
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
package net.corda.behave.ssh
|
||||
|
||||
import net.corda.behave.logging.getLogger
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.sshd.client.SshClient
|
||||
import org.apache.sshd.client.channel.ChannelShell
|
||||
import org.apache.sshd.client.session.ClientSession
|
||||
@ -106,7 +106,7 @@ open class SSHClient private constructor(
|
||||
|
||||
companion object {
|
||||
|
||||
private val log = getLogger<SSHClient>()
|
||||
private val log = contextLogger()
|
||||
|
||||
fun connect(
|
||||
port: Int,
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
package net.corda.behave.monitoring
|
||||
|
||||
import net.corda.behave.second
|
||||
import net.corda.core.utilities.seconds
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
@ -20,14 +20,14 @@ class MonitoringTests {
|
||||
@Test
|
||||
fun `watch gets triggered when pattern is observed`() {
|
||||
val observable = Observable.just("first", "second", "third")
|
||||
val result = PatternWatch(observable, "c.n").await(1.second)
|
||||
val result = PatternWatch(observable, "c.n").await(1.seconds)
|
||||
assertThat(result).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `watch does not get triggered when pattern is not observed`() {
|
||||
val observable = Observable.just("first", "second", "third")
|
||||
val result = PatternWatch(observable, "forth").await(1.second)
|
||||
val result = PatternWatch(observable, "forth").await(1.seconds)
|
||||
assertThat(result).isFalse()
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ class MonitoringTests {
|
||||
val watch2 = PatternWatch(observable, "ond")
|
||||
val watch3 = PatternWatch(observable, "ird")
|
||||
val aggregate = watch1 * watch2 * watch3
|
||||
assertThat(aggregate.await(1.second)).isTrue()
|
||||
assertThat(aggregate.await(1.seconds)).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -48,7 +48,7 @@ class MonitoringTests {
|
||||
val watch2 = PatternWatch(observable, "ond")
|
||||
val watch3 = PatternWatch(observable, "baz")
|
||||
val aggregate = watch1 * watch2 * watch3
|
||||
assertThat(aggregate.await(1.second)).isFalse()
|
||||
assertThat(aggregate.await(1.seconds)).isFalse()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -58,7 +58,7 @@ class MonitoringTests {
|
||||
val watch2 = PatternWatch(observable, "ond")
|
||||
val watch3 = PatternWatch(observable, "bar")
|
||||
val aggregate = watch1 / watch2 / watch3
|
||||
assertThat(aggregate.await(1.second)).isTrue()
|
||||
assertThat(aggregate.await(1.seconds)).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -68,7 +68,7 @@ class MonitoringTests {
|
||||
val watch2 = PatternWatch(observable, "baz")
|
||||
val watch3 = PatternWatch(observable, "bar")
|
||||
val aggregate = watch1 / watch2 / watch3
|
||||
assertThat(aggregate.await(1.second)).isFalse()
|
||||
assertThat(aggregate.await(1.seconds)).isFalse()
|
||||
}
|
||||
|
||||
}
|
@ -12,7 +12,7 @@ package net.corda.behave.network
|
||||
|
||||
import net.corda.behave.database.DatabaseType
|
||||
import net.corda.behave.node.configuration.NotaryType
|
||||
import net.corda.behave.seconds
|
||||
import net.corda.core.utilities.seconds
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
|
||||
|
@ -26,6 +26,7 @@ import java.net.Proxy
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.temporal.Temporal
|
||||
@ -116,6 +117,7 @@ private fun Config.getSingleValue(path: String, type: KType, strict: Boolean = t
|
||||
Double::class -> getDouble(path)
|
||||
Boolean::class -> getBoolean(path)
|
||||
LocalDate::class -> LocalDate.parse(getString(path))
|
||||
Duration::class -> getDuration(path)
|
||||
Instant::class -> Instant.parse(getString(path))
|
||||
NetworkHostAndPort::class -> NetworkHostAndPort.parse(getString(path))
|
||||
Path::class -> Paths.get(getString(path))
|
||||
|
@ -139,7 +139,8 @@ class P2PMessagingTest : IntegrationTest() {
|
||||
}
|
||||
|
||||
private fun DriverDSL.startAlice(): InProcess {
|
||||
return startNode(providedName = ALICE_NAME, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
|
||||
return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf(
|
||||
"messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3)))
|
||||
.map { (it as InProcess) }
|
||||
.getOrThrow()
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val compatibilityZoneURL: URL?
|
||||
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
|
||||
val verifierType: VerifierType
|
||||
val messageRedeliveryDelaySeconds: Int
|
||||
val p2pMessagingRetry: P2PMessagingRetryConfiguration
|
||||
val notary: NotaryConfig?
|
||||
val additionalNodeInfoPollingFrequencyMsec: Long
|
||||
val p2pAddress: NetworkHostAndPort
|
||||
@ -140,6 +140,18 @@ data class BFTSMaRtConfiguration(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Currently only used for notarisation requests.
|
||||
*
|
||||
* When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin
|
||||
* in case of clustered notaries.
|
||||
*/
|
||||
data class P2PMessagingRetryConfiguration(
|
||||
val messageRedeliveryDelay: Duration,
|
||||
val maxRetryCount: Int,
|
||||
val backoffBase: Double
|
||||
)
|
||||
|
||||
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
|
||||
|
||||
data class NodeConfigurationImpl(
|
||||
@ -155,9 +167,7 @@ data class NodeConfigurationImpl(
|
||||
override val rpcUsers: List<User>,
|
||||
override val security: SecurityConfiguration? = null,
|
||||
override val verifierType: VerifierType,
|
||||
// TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration.
|
||||
// Then rename this to messageRedeliveryDelay and make it of type Duration
|
||||
override val messageRedeliveryDelaySeconds: Int = 30,
|
||||
override val p2pMessagingRetry: P2PMessagingRetryConfiguration,
|
||||
override val p2pAddress: NetworkHostAndPort,
|
||||
private val rpcAddress: NetworkHostAndPort? = null,
|
||||
private val rpcSettings: NodeRpcSettings,
|
||||
|
@ -115,7 +115,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
private const val messageMaxRetryCount: Int = 3
|
||||
|
||||
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
|
||||
return PersistentMap(
|
||||
@ -143,6 +142,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount
|
||||
private val backoffBase: Double = config.p2pMessagingRetry.backoffBase
|
||||
|
||||
private class InnerState {
|
||||
var started = false
|
||||
var running = false
|
||||
@ -168,7 +170,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration
|
||||
|
||||
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
|
||||
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
||||
private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
|
||||
private val externalBridge: Boolean = config.enterpriseConfiguration.externalBridge ?: false
|
||||
@ -551,7 +553,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
|
||||
scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({
|
||||
sendWithRetry(retryCount + 1, message, target, retryId)
|
||||
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
|
||||
}, messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
override fun cancelRedelivery(retryId: Long) {
|
||||
|
@ -138,9 +138,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
0
|
||||
} else {
|
||||
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Node will shutdown now. Please update node to use correct network parameters file.")
|
||||
logger.error(
|
||||
"""Node is using network parameters with hash $currentParametersHash but the network map is advertising ${networkMap.networkParameterHash}.
|
||||
To resolve this mismatch, and move to the current parameters, delete the $NETWORK_PARAMS_FILE_NAME file from the node's directory and restart.
|
||||
The node will shutdown now.""")
|
||||
1
|
||||
}
|
||||
exitProcess(exitCode)
|
||||
|
@ -32,9 +32,12 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut
|
||||
@Suspendable
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
val transaction = requestPayload.coreTransaction
|
||||
checkInputs(transaction.inputs)
|
||||
val request = NotarisationRequest(transaction.inputs, transaction.id)
|
||||
validateRequestSignature(request, requestPayload.requestSignature)
|
||||
return extractParts(transaction)
|
||||
val parts = extractParts(transaction)
|
||||
checkNotary(parts.notary)
|
||||
return parts
|
||||
}
|
||||
|
||||
private fun extractParts(tx: CoreTransaction): TransactionParts {
|
||||
@ -45,8 +48,7 @@ class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAut
|
||||
checkAllComponentsVisible(ComponentGroupEnum.INPUTS_GROUP)
|
||||
checkAllComponentsVisible(ComponentGroupEnum.TIMEWINDOW_GROUP)
|
||||
}
|
||||
val notary = tx.notary
|
||||
TransactionParts(tx.id, tx.inputs, tx.timeWindow, notary)
|
||||
TransactionParts(tx.id, tx.inputs, tx.timeWindow, tx.notary)
|
||||
}
|
||||
is ContractUpgradeFilteredTransaction -> TransactionParts(tx.id, tx.inputs, null, tx.notary)
|
||||
is NotaryChangeWireTransaction -> TransactionParts(tx.id, tx.inputs, null, tx.notary)
|
||||
|
@ -37,6 +37,7 @@ class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthor
|
||||
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
|
||||
try {
|
||||
val stx = requestPayload.signedTransaction
|
||||
checkInputs(stx.inputs)
|
||||
validateRequestSignature(NotarisationRequest(stx.inputs, stx.id), requestPayload.requestSignature)
|
||||
val notary = stx.notary
|
||||
checkNotary(notary)
|
||||
|
@ -49,3 +49,8 @@ rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
}
|
||||
p2pMessagingRetry {
|
||||
messageRedeliveryDelay = 30 seconds
|
||||
maxRetryCount = 3
|
||||
backoffBase = 2.0
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import com.zaxxer.hikari.HikariConfig
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.tools.shell.SSHDConfiguration
|
||||
@ -114,6 +115,7 @@ class NodeConfigurationImplTest {
|
||||
verifierType = VerifierType.InMemory,
|
||||
p2pAddress = NetworkHostAndPort("localhost", 0),
|
||||
messagingServerAddress = null,
|
||||
p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0),
|
||||
notary = null,
|
||||
certificateChainCheckPolicies = emptyList(),
|
||||
devMode = true,
|
||||
|
@ -15,8 +15,13 @@ import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.P2PMessagingRetryConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.network.NetworkMapCacheImpl
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
@ -80,8 +85,8 @@ class ArtemisMessagingTest {
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase=1.0)).whenever(it).p2pMessagingRetry
|
||||
}
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock())
|
||||
|
@ -10,39 +10,28 @@
|
||||
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.generateSignature
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.internal.NotaryChangeTransactionBuilder
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import net.corda.testing.node.MockNetworkNotarySpec
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class NotaryServiceTests {
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
@ -53,7 +42,10 @@ class NotaryServiceTests {
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappPackages = listOf("net.corda.testing.contracts"),
|
||||
notarySpecs = listOf(MockNetworkNotarySpec(DUMMY_NOTARY_NAME, validating = false))
|
||||
)
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
notaryServices = mockNet.defaultNotaryNode.services //TODO get rid of that
|
||||
notary = mockNet.defaultNotaryIdentity
|
||||
@ -66,201 +58,38 @@ class NotaryServiceTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should sign a unique transaction with a valid time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.setTimeWindow(Instant.now(), 30.seconds)
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val signatures = future.getOrThrow()
|
||||
signatures.forEach { it.verify(stx.id) }
|
||||
fun `should reject a transaction with too many inputs`() {
|
||||
notariseWithTooManyInputs(aliceNode, alice, notary, mockNet)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should sign a unique transaction without a time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
internal companion object {
|
||||
/** This is used by both [NotaryServiceTests] and [ValidatingNotaryServiceTests]. */
|
||||
fun notariseWithTooManyInputs(node: StartedNode<InternalMockNetwork.MockNode>, party: Party, notary: Party, network: InternalMockNetwork) {
|
||||
val stx = generateTransaction(node, party, notary)
|
||||
|
||||
val future = node.services.startFlow(DummyClientFlow(stx, notary)).resultFuture
|
||||
network.runNetwork()
|
||||
assertFailsWith<NotaryException> { future.getOrThrow() }
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val signatures = future.getOrThrow()
|
||||
signatures.forEach { it.verify(stx.id) }
|
||||
}
|
||||
private fun generateTransaction(node: StartedNode<InternalMockNetwork.MockNode>, party: Party, notary: Party): SignedTransaction {
|
||||
val inputs = (1..10_005).map { StateRef(SecureHash.randomSHA256(), 0) }
|
||||
val tx = NotaryChangeTransactionBuilder(inputs, notary, party).build()
|
||||
|
||||
@Test
|
||||
fun `should report error for transaction with an invalid time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.setTimeWindow(Instant.now().plusSeconds(3600), 30.seconds)
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
|
||||
assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should sign identical transaction multiple times (notarisation is idempotent)`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val firstAttempt = NotaryFlow.Client(stx)
|
||||
val secondAttempt = NotaryFlow.Client(stx)
|
||||
val f1 = aliceNode.services.startFlow(firstAttempt).resultFuture
|
||||
val f2 = aliceNode.services.startFlow(secondAttempt).resultFuture
|
||||
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Note that the notary will only return identical signatures when using deterministic signature
|
||||
// schemes (e.g. EdDSA) and when deterministic metadata is attached (no timestamps or nonces).
|
||||
// We only really care that both signatures are over the same transaction and by the same notary.
|
||||
val sig1 = f1.getOrThrow().single()
|
||||
assertEquals(sig1.by, notary.owningKey)
|
||||
assertTrue(sig1.isValid(stx.id))
|
||||
|
||||
val sig2 = f2.getOrThrow().single()
|
||||
assertEquals(sig2.by, notary.owningKey)
|
||||
assertTrue(sig2.isValid(stx.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report conflict when inputs are reused across transactions`() {
|
||||
val firstState = issueState(aliceNode.services, alice)
|
||||
val secondState = issueState(aliceNode.services, alice)
|
||||
|
||||
fun spendState(state: StateAndRef<*>): SignedTransaction {
|
||||
val stx = run {
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(state)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
return node.services.run {
|
||||
val myKey = myInfo.legalIdentities.first().owningKey
|
||||
val signableData = SignableData(tx.id, SignatureMetadata(myInfo.platformVersion, Crypto.findSignatureScheme(myKey).schemeNumberID))
|
||||
val mySignature = keyManagementService.sign(signableData, myKey)
|
||||
SignedTransaction(tx, listOf(mySignature))
|
||||
}
|
||||
aliceNode.services.startFlow(NotaryFlow.Client(stx))
|
||||
mockNet.runNetwork()
|
||||
return stx
|
||||
}
|
||||
|
||||
val firstSpendTx = spendState(firstState)
|
||||
val secondSpendTx = spendState(secondState)
|
||||
|
||||
val doubleSpendTx = run {
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(issueState(aliceNode.services, alice))
|
||||
.addInputState(firstState)
|
||||
.addInputState(secondState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val doubleSpend = NotaryFlow.Client(doubleSpendTx) // Double spend the inputState in a second transaction.
|
||||
val future = aliceNode.services.startFlow(doubleSpend)
|
||||
mockNet.runNetwork()
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { future.resultFuture.getOrThrow() }
|
||||
val notaryError = ex.error as NotaryError.Conflict
|
||||
assertEquals(notaryError.txId, doubleSpendTx.id)
|
||||
with(notaryError) {
|
||||
assertEquals(consumedStates.size, 2)
|
||||
assertEquals(consumedStates[firstState.ref]!!.hashOfTransactionId, firstSpendTx.id.sha256())
|
||||
assertEquals(consumedStates[secondState.ref]!!.hashOfTransactionId, secondSpendTx.id.sha256())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when notarisation request not signed by the requesting party`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val randomKeyPair = Crypto.generateKeyPair()
|
||||
val bytesToSign = NotarisationRequest(transaction.inputs, transaction.id).serialize().bytes
|
||||
val modifiedSignature = NotarisationRequestSignature(randomKeyPair.sign(bytesToSign), aliceNode.services.myInfo.platformVersion)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when incorrect notarisation request signed - inputs don't match`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val wrongInputs = listOf(StateRef(SecureHash.randomSHA256(), 0))
|
||||
val request = NotarisationRequest(wrongInputs, transaction.id)
|
||||
val modifiedSignature = request.generateSignature(aliceNode.services)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when incorrect notarisation request signed - transaction id doesn't match`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val wrongTransactionId = SecureHash.randomSHA256()
|
||||
val request = NotarisationRequest(transaction.inputs, wrongTransactionId)
|
||||
val modifiedSignature = request.generateSignature(aliceNode.services)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) {
|
||||
aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) {
|
||||
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
|
||||
val messageData = message.data.deserialize<Any>() as? InitialSessionMessage
|
||||
val payload = messageData?.firstPayload!!.deserialize()
|
||||
|
||||
if (payload is NotarisationPayload) {
|
||||
val alteredPayload = payloadModifier(payload)
|
||||
val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize())
|
||||
val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId)
|
||||
messagingService.send(alteredMessage, target, retryId)
|
||||
|
||||
} else {
|
||||
messagingService.send(message, target, retryId)
|
||||
}
|
||||
private class DummyClientFlow(stx: SignedTransaction, val notary: Party) : NotaryFlow.Client(stx) {
|
||||
@Suspendable
|
||||
override fun call(): List<TransactionSignature> {
|
||||
notarise(notary)
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
})
|
||||
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
|
||||
assertThat(ex.error).isInstanceOf(NotaryError.RequestSignatureInvalid::class.java)
|
||||
}
|
||||
|
||||
private fun runNotaryClient(stx: SignedTransaction): CordaFuture<List<TransactionSignature>> {
|
||||
val flow = NotaryFlow.Client(stx)
|
||||
val future = aliceNode.services.startFlow(flow).resultFuture
|
||||
mockNet.runNetwork()
|
||||
return future
|
||||
}
|
||||
|
||||
private fun issueState(services: ServiceHub, identity: Party): StateAndRef<*> {
|
||||
val tx = DummyContract.generateInitial(Random().nextInt(), notary, identity.ref(0))
|
||||
val signedByNode = services.signInitialTransaction(tx)
|
||||
val stx = notaryServices.addSignature(signedByNode, notary.owningKey)
|
||||
services.recordTransactions(stx)
|
||||
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
|
||||
}
|
||||
}
|
||||
|
@ -14,43 +14,49 @@ import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.generateSignature
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.issueInvalidState
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNodeParameters
|
||||
import net.corda.testing.node.StartedMockNode
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class ValidatingNotaryServiceTests {
|
||||
private lateinit var mockNet: MockNetwork
|
||||
private lateinit var notaryNode: StartedMockNode
|
||||
private lateinit var aliceNode: StartedMockNode
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var notaryNode: StartedNode<InternalMockNetwork.MockNode>
|
||||
private lateinit var aliceNode: StartedNode<InternalMockNetwork.MockNode>
|
||||
private lateinit var notary: Party
|
||||
private lateinit var alice: Party
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
|
||||
aliceNode = mockNet.createNode(MockNodeParameters(legalName = ALICE_NAME))
|
||||
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
notaryNode = mockNet.defaultNotaryNode
|
||||
notary = mockNet.defaultNotaryIdentity
|
||||
alice = aliceNode.info.singleIdentity()
|
||||
@ -71,7 +77,7 @@ class ValidatingNotaryServiceTests {
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runClient(stx)
|
||||
val future = runNotaryClient(stx)
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
|
||||
val notaryError = ex.error as NotaryError.TransactionInvalid
|
||||
@ -92,16 +98,205 @@ class ValidatingNotaryServiceTests {
|
||||
// Expecting SignaturesMissingException instead of NotaryException, since the exception should originate from
|
||||
// the client flow.
|
||||
val ex = assertFailsWith<SignedTransaction.SignaturesMissingException> {
|
||||
val future = runClient(stx)
|
||||
val future = runNotaryClient(stx)
|
||||
future.getOrThrow()
|
||||
}
|
||||
val missingKeys = ex.missing
|
||||
assertEquals(setOf(expectedMissingKey), missingKeys)
|
||||
}
|
||||
|
||||
private fun runClient(stx: SignedTransaction): CordaFuture<List<TransactionSignature>> {
|
||||
@Test
|
||||
fun `should sign a unique transaction with a valid time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.setTimeWindow(Instant.now(), 30.seconds)
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val signatures = future.getOrThrow()
|
||||
signatures.forEach { it.verify(stx.id) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should sign a unique transaction without a time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val signatures = future.getOrThrow()
|
||||
signatures.forEach { it.verify(stx.id) }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report error for transaction with an invalid time-window`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.setTimeWindow(Instant.now().plusSeconds(3600), 30.seconds)
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
|
||||
assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should sign identical transaction multiple times (notarisation is idempotent)`() {
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val firstAttempt = NotaryFlow.Client(stx)
|
||||
val secondAttempt = NotaryFlow.Client(stx)
|
||||
val f1 = aliceNode.services.startFlow(firstAttempt).resultFuture
|
||||
val f2 = aliceNode.services.startFlow(secondAttempt).resultFuture
|
||||
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Note that the notary will only return identical signatures when using deterministic signature
|
||||
// schemes (e.g. EdDSA) and when deterministic metadata is attached (no timestamps or nonces).
|
||||
// We only really care that both signatures are over the same transaction and by the same notary.
|
||||
val sig1 = f1.getOrThrow().single()
|
||||
assertEquals(sig1.by, notary.owningKey)
|
||||
assertTrue(sig1.isValid(stx.id))
|
||||
|
||||
val sig2 = f2.getOrThrow().single()
|
||||
assertEquals(sig2.by, notary.owningKey)
|
||||
assertTrue(sig2.isValid(stx.id))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report conflict when inputs are reused across transactions`() {
|
||||
val firstState = issueState(aliceNode.services, alice)
|
||||
val secondState = issueState(aliceNode.services, alice)
|
||||
|
||||
fun spendState(state: StateAndRef<*>): SignedTransaction {
|
||||
val stx = run {
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(state)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
aliceNode.services.startFlow(NotaryFlow.Client(stx))
|
||||
mockNet.runNetwork()
|
||||
return stx
|
||||
}
|
||||
|
||||
val firstSpendTx = spendState(firstState)
|
||||
val secondSpendTx = spendState(secondState)
|
||||
|
||||
val doubleSpendTx = run {
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(issueState(aliceNode.services, alice))
|
||||
.addInputState(firstState)
|
||||
.addInputState(secondState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val doubleSpend = NotaryFlow.Client(doubleSpendTx) // Double spend the inputState in a second transaction.
|
||||
val future = aliceNode.services.startFlow(doubleSpend)
|
||||
mockNet.runNetwork()
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { future.resultFuture.getOrThrow() }
|
||||
val notaryError = ex.error as NotaryError.Conflict
|
||||
assertEquals(notaryError.txId, doubleSpendTx.id)
|
||||
with(notaryError) {
|
||||
assertEquals(consumedStates.size, 2)
|
||||
assertEquals(consumedStates[firstState.ref]!!.hashOfTransactionId, firstSpendTx.id.sha256())
|
||||
assertEquals(consumedStates[secondState.ref]!!.hashOfTransactionId, secondSpendTx.id.sha256())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when notarisation request not signed by the requesting party`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val randomKeyPair = Crypto.generateKeyPair()
|
||||
val bytesToSign = NotarisationRequest(transaction.inputs, transaction.id).serialize().bytes
|
||||
val modifiedSignature = NotarisationRequestSignature(randomKeyPair.sign(bytesToSign), aliceNode.services.myInfo.platformVersion)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when incorrect notarisation request signed - inputs don't match`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val wrongInputs = listOf(StateRef(SecureHash.randomSHA256(), 0))
|
||||
val request = NotarisationRequest(wrongInputs, transaction.id)
|
||||
val modifiedSignature = request.generateSignature(aliceNode.services)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject when incorrect notarisation request signed - transaction id doesn't match`() {
|
||||
runNotarisationAndInterceptClientPayload { originalPayload ->
|
||||
val transaction = originalPayload.signedTransaction
|
||||
val wrongTransactionId = SecureHash.randomSHA256()
|
||||
val request = NotarisationRequest(transaction.inputs, wrongTransactionId)
|
||||
val modifiedSignature = request.generateSignature(aliceNode.services)
|
||||
originalPayload.copy(requestSignature = modifiedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should reject a transaction with too many inputs`() {
|
||||
NotaryServiceTests.notariseWithTooManyInputs(aliceNode, alice, notary, mockNet)
|
||||
}
|
||||
|
||||
private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) {
|
||||
aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) {
|
||||
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
|
||||
val messageData = message.data.deserialize<Any>() as? InitialSessionMessage
|
||||
val payload = messageData?.firstPayload!!.deserialize()
|
||||
|
||||
if (payload is NotarisationPayload) {
|
||||
val alteredPayload = payloadModifier(payload)
|
||||
val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize())
|
||||
val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId)
|
||||
messagingService.send(alteredMessage, target, retryId)
|
||||
|
||||
} else {
|
||||
messagingService.send(message, target, retryId)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
val stx = run {
|
||||
val inputState = issueState(aliceNode.services, alice)
|
||||
val tx = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
aliceNode.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val future = runNotaryClient(stx)
|
||||
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
|
||||
assertThat(ex.error).isInstanceOf(NotaryError.RequestSignatureInvalid::class.java)
|
||||
}
|
||||
|
||||
private fun runNotaryClient(stx: SignedTransaction): CordaFuture<List<TransactionSignature>> {
|
||||
val flow = NotaryFlow.Client(stx)
|
||||
val future = aliceNode.startFlow(flow)
|
||||
val future = aliceNode.services.startFlow(flow).resultFuture
|
||||
mockNet.runNetwork()
|
||||
return future
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ private fun mockNodeConfiguration(): NodeConfiguration {
|
||||
doReturn(null).whenever(it).compatibilityZoneURL
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(VerifierType.InMemory).whenever(it).verifierType
|
||||
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
||||
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry
|
||||
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
|
||||
doReturn(null).whenever(it).devModeOptions
|
||||
doReturn(EnterpriseConfiguration(
|
||||
|
Loading…
x
Reference in New Issue
Block a user