mirror of
https://github.com/corda/corda.git
synced 2025-05-29 21:54:26 +00:00
CORDA-699 Add injection or modification of memory network messages (#1920)
This commit is contained in:
parent
4b8590ef41
commit
2fe3fbbfef
@ -0,0 +1,111 @@
|
|||||||
|
package net.corda.docs.tutorial.mocknetwork
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import net.corda.core.contracts.requireThat
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.FlowSession
|
||||||
|
import net.corda.core.flows.InitiatedBy
|
||||||
|
import net.corda.core.flows.InitiatingFlow
|
||||||
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.messaging.MessageRecipients
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
|
import net.corda.core.utilities.getOrThrow
|
||||||
|
import net.corda.core.utilities.unwrap
|
||||||
|
import net.corda.node.internal.StartedNode
|
||||||
|
import net.corda.node.services.messaging.Message
|
||||||
|
import net.corda.node.services.statemachine.SessionData
|
||||||
|
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||||
|
import net.corda.testing.node.MessagingServiceSpy
|
||||||
|
import net.corda.testing.node.MockNetwork
|
||||||
|
import net.corda.testing.node.setMessagingServiceSpy
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Rule
|
||||||
|
import org.junit.Test
|
||||||
|
import org.junit.rules.ExpectedException
|
||||||
|
|
||||||
|
class TutorialMockNetwork {
|
||||||
|
|
||||||
|
@InitiatingFlow
|
||||||
|
class FlowA(private val otherParty: Party) : FlowLogic<Unit>() {
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
val session = initiateFlow(otherParty)
|
||||||
|
|
||||||
|
session.receive<Int>().unwrap {
|
||||||
|
requireThat { "Expected to receive 1" using (it == 1) }
|
||||||
|
}
|
||||||
|
|
||||||
|
session.receive<Int>().unwrap {
|
||||||
|
requireThat { "Expected to receive 2" using (it == 2) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@InitiatedBy(FlowA::class)
|
||||||
|
class FlowB(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
session.send(1)
|
||||||
|
session.send(2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lateinit private var mockNet: MockNetwork
|
||||||
|
lateinit private var notary: StartedNode<MockNetwork.MockNode>
|
||||||
|
lateinit private var nodeA: StartedNode<MockNetwork.MockNode>
|
||||||
|
lateinit private var nodeB: StartedNode<MockNetwork.MockNode>
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
@JvmField
|
||||||
|
val expectedEx: ExpectedException = ExpectedException.none()
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun setUp() {
|
||||||
|
mockNet = MockNetwork()
|
||||||
|
notary = mockNet.createNotaryNode()
|
||||||
|
nodeA = mockNet.createPartyNode()
|
||||||
|
nodeB = mockNet.createPartyNode()
|
||||||
|
|
||||||
|
nodeB.registerInitiatedFlow(FlowB::class.java)
|
||||||
|
|
||||||
|
mockNet.runNetwork()
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun tearDown() {
|
||||||
|
mockNet.stopNodes()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `fail if initiated doesn't send back 1 on first result`() {
|
||||||
|
|
||||||
|
// DOCSTART 1
|
||||||
|
// modify message if it's 1
|
||||||
|
nodeB.setMessagingServiceSpy(object : MessagingServiceSpy(nodeB.network) {
|
||||||
|
|
||||||
|
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, acknowledgementHandler: (() -> Unit)?) {
|
||||||
|
val messageData = message.data.deserialize<Any>()
|
||||||
|
|
||||||
|
if (messageData is SessionData && messageData.payload.deserialize() == 1) {
|
||||||
|
val alteredMessageData = SessionData(messageData.recipientSessionId, 99.serialize()).serialize().bytes
|
||||||
|
messagingService.send(InMemoryMessagingNetwork.InMemoryMessage(message.topicSession, alteredMessageData, message.uniqueMessageId), target, retryId)
|
||||||
|
} else {
|
||||||
|
messagingService.send(message, target, retryId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
// DOCEND 1
|
||||||
|
|
||||||
|
val initiatingReceiveFlow = nodeA.services.startFlow(FlowA(nodeB.info.legalIdentities.first()))
|
||||||
|
|
||||||
|
mockNet.runNetwork()
|
||||||
|
|
||||||
|
expectedEx.expect(IllegalArgumentException::class.java)
|
||||||
|
expectedEx.expectMessage("Expected to receive 1")
|
||||||
|
initiatingReceiveFlow.resultFuture.getOrThrow()
|
||||||
|
}
|
||||||
|
}
|
@ -63,4 +63,17 @@ transaction as shown here.
|
|||||||
|
|
||||||
With regards to initiated flows (see :doc:`flow-state-machines` for information on initiated and initiating flows), the
|
With regards to initiated flows (see :doc:`flow-state-machines` for information on initiated and initiating flows), the
|
||||||
full node automatically registers them by scanning the CorDapp jars. In a unit test environment this is not possible so
|
full node automatically registers them by scanning the CorDapp jars. In a unit test environment this is not possible so
|
||||||
``MockNode`` has the ``registerInitiatedFlow`` method to manually register an initiated flow.
|
``MockNode`` has the ``registerInitiatedFlow`` method to manually register an initiated flow.
|
||||||
|
|
||||||
|
MockNetwork message manipulation
|
||||||
|
--------------------------------
|
||||||
|
The MockNetwork has the ability to manipulate message streams. You can use this to test your flows behaviour on corrupted,
|
||||||
|
or malicious data received.
|
||||||
|
|
||||||
|
Message modification example in ``TutorialMockNetwork.kt``:
|
||||||
|
|
||||||
|
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/mocknetwork/TutorialMockNetwork.kt
|
||||||
|
:language: kotlin
|
||||||
|
:start-after: DOCSTART 1
|
||||||
|
:end-before: DOCEND 1
|
||||||
|
:dedent: 8
|
||||||
|
@ -279,7 +279,7 @@ class InMemoryMessagingNetwork(
|
|||||||
_sentMessages.onNext(transfer)
|
_sentMessages.onNext(transfer)
|
||||||
}
|
}
|
||||||
|
|
||||||
private data class InMemoryMessage(override val topicSession: TopicSession,
|
data class InMemoryMessage(override val topicSession: TopicSession,
|
||||||
override val data: ByteArray,
|
override val data: ByteArray,
|
||||||
override val uniqueMessageId: UUID,
|
override val uniqueMessageId: UUID,
|
||||||
override val debugTimestamp: Instant = Instant.now()) : Message {
|
override val debugTimestamp: Instant = Instant.now()) : Message {
|
||||||
|
@ -17,6 +17,7 @@ import net.corda.core.messaging.RPCOps
|
|||||||
import net.corda.core.messaging.SingleMessageRecipient
|
import net.corda.core.messaging.SingleMessageRecipient
|
||||||
import net.corda.core.node.services.IdentityService
|
import net.corda.core.node.services.IdentityService
|
||||||
import net.corda.core.node.services.KeyManagementService
|
import net.corda.core.node.services.KeyManagementService
|
||||||
|
import net.corda.core.node.services.PartyInfo
|
||||||
import net.corda.core.serialization.SerializationWhitelist
|
import net.corda.core.serialization.SerializationWhitelist
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
@ -31,7 +32,7 @@ import net.corda.node.services.config.BFTSMaRtConfiguration
|
|||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.NotaryConfig
|
import net.corda.node.services.config.NotaryConfig
|
||||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||||
import net.corda.node.services.messaging.MessagingService
|
import net.corda.node.services.messaging.*
|
||||||
import net.corda.node.services.network.InMemoryNetworkMapService
|
import net.corda.node.services.network.InMemoryNetworkMapService
|
||||||
import net.corda.node.services.network.NetworkMapService
|
import net.corda.node.services.network.NetworkMapService
|
||||||
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
||||||
@ -53,6 +54,7 @@ import java.math.BigInteger
|
|||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.security.KeyPair
|
import java.security.KeyPair
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
import java.util.*
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
@ -214,6 +216,10 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
|
|||||||
.getOrThrow()
|
.getOrThrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
|
||||||
|
network = messagingServiceSpy
|
||||||
|
}
|
||||||
|
|
||||||
override fun makeKeyManagementService(identityService: IdentityService): KeyManagementService {
|
override fun makeKeyManagementService(identityService: IdentityService): KeyManagementService {
|
||||||
return E2ETestKeyManagementService(identityService, partyKeys + (notaryIdentity?.let { setOf(it.second) } ?: emptySet()))
|
return E2ETestKeyManagementService(identityService, partyKeys + (notaryIdentity?.let { setOf(it.second) } ?: emptySet()))
|
||||||
}
|
}
|
||||||
@ -421,4 +427,16 @@ fun network(nodesCount: Int, action: MockNetwork.(nodes: List<StartedNode<MockNe
|
|||||||
val nodes = (1..nodesCount).map { _ -> it.createPartyNode() }
|
val nodes = (1..nodesCount).map { _ -> it.createPartyNode() }
|
||||||
action(it, nodes, notary)
|
action(it, nodes, notary)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extend this class in order to intercept and modify messages passing through the [MessagingService] when using the [InMemoryNetwork].
|
||||||
|
*/
|
||||||
|
open class MessagingServiceSpy(val messagingService: MessagingService) : MessagingService by messagingService
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attach a [MessagingServiceSpy] to the [MockNode] allowing interception and modification of messages.
|
||||||
|
*/
|
||||||
|
fun StartedNode<MockNetwork.MockNode>.setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
|
||||||
|
internals.setMessagingServiceSpy(messagingServiceSpy)
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user