mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Removed the startNotary methods from NodeBasedTest, and moved it into an internal package
This commit is in preparation for the upcoming network parameters work.
This commit is contained in:
@ -8,53 +8,53 @@ import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.DUMMY_BANK_A
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.dummyCommand
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class RaftNotaryServiceTests : NodeBasedTest(listOf("net.corda.testing.contracts")) {
|
||||
class RaftNotaryServiceTests {
|
||||
private val notaryName = CordaX500Name(RaftValidatingNotaryService.id, "RAFT Notary Service", "London", "GB")
|
||||
|
||||
@Test
|
||||
fun `detect double spend`() {
|
||||
val (bankA) = listOf(
|
||||
startNode(DUMMY_BANK_A.name),
|
||||
startNotaryCluster(notaryName, 3).map { it.first() }
|
||||
).transpose().getOrThrow()
|
||||
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts")) {
|
||||
val (notaryParty) = startNotaryCluster(notaryName, 3).getOrThrow()
|
||||
val bankA = startNode(providedName = DUMMY_BANK_A.name).map { (it as NodeHandle.InProcess).node }.getOrThrow()
|
||||
|
||||
val notaryParty = bankA.services.networkMapCache.getNotary(notaryName)!!
|
||||
val inputState = issueState(bankA, notaryParty)
|
||||
|
||||
val inputState = issueState(bankA, notaryParty)
|
||||
val firstTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
|
||||
val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder)
|
||||
|
||||
val firstTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState)
|
||||
.addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
|
||||
val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder)
|
||||
val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx))
|
||||
firstSpend.resultFuture.getOrThrow()
|
||||
|
||||
val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx))
|
||||
firstSpend.resultFuture.getOrThrow()
|
||||
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
|
||||
val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity())
|
||||
addOutputState(dummyState, DummyContract.PROGRAM_ID)
|
||||
addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
|
||||
this
|
||||
}
|
||||
val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder)
|
||||
val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx))
|
||||
|
||||
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
|
||||
val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity())
|
||||
addOutputState(dummyState, DummyContract.PROGRAM_ID)
|
||||
addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey))
|
||||
this
|
||||
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
assertEquals(error.txId, secondSpendTx.id)
|
||||
}
|
||||
val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder)
|
||||
val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx))
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() }
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
assertEquals(error.txId, secondSpendTx.id)
|
||||
}
|
||||
|
||||
private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> {
|
||||
|
@ -13,14 +13,12 @@ import net.corda.testing.ALICE
|
||||
import net.corda.testing.ALICE_KEY
|
||||
import net.corda.testing.DEV_TRUST_ROOT
|
||||
import net.corda.testing.getTestPartyAndCertificate
|
||||
import net.corda.testing.internal.NodeBasedTest
|
||||
import net.corda.testing.node.MockKeyManagementService
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.contentOf
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import rx.observers.TestSubscriber
|
||||
import rx.schedulers.TestScheduler
|
||||
import java.nio.file.Path
|
||||
@ -29,42 +27,37 @@ import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class NodeInfoWatcherTest : NodeBasedTest() {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
var folder = TemporaryFolder()
|
||||
|
||||
lateinit var keyManagementService: KeyManagementService
|
||||
lateinit var nodeInfoPath: Path
|
||||
val scheduler = TestScheduler()
|
||||
val testSubscriber = TestSubscriber<NodeInfo>()
|
||||
|
||||
// Object under test
|
||||
lateinit var nodeInfoWatcher: NodeInfoWatcher
|
||||
|
||||
companion object {
|
||||
val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0)
|
||||
}
|
||||
|
||||
private lateinit var keyManagementService: KeyManagementService
|
||||
private lateinit var nodeInfoPath: Path
|
||||
private val scheduler = TestScheduler()
|
||||
private val testSubscriber = TestSubscriber<NodeInfo>()
|
||||
|
||||
// Object under test
|
||||
private lateinit var nodeInfoWatcher: NodeInfoWatcher
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
|
||||
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
|
||||
nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler = scheduler)
|
||||
nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
|
||||
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler)
|
||||
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `save a NodeInfo`() {
|
||||
assertEquals(0,
|
||||
folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
|
||||
NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService)
|
||||
tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
|
||||
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfo, keyManagementService)
|
||||
|
||||
val nodeInfoFiles = folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
|
||||
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
|
||||
assertEquals(1, nodeInfoFiles.size)
|
||||
val fileName = nodeInfoFiles.first()
|
||||
assertTrue(fileName.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX))
|
||||
val file = (folder.root.path / fileName).toFile()
|
||||
val file = (tempFolder.root.path / fileName).toFile()
|
||||
// Just check that something is written, another tests verifies that the written value can be read back.
|
||||
assertThat(contentOf(file)).isNotEmpty()
|
||||
}
|
||||
|
@ -1,32 +1,28 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.internal.NodeBasedTest
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
|
||||
private val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
|
||||
private val addressesMap = HashMap<CordaX500Name, NetworkHostAndPort>()
|
||||
private val infos: MutableSet<NodeInfo> = HashSet()
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val nodes = startNodesWithPort(partiesList)
|
||||
nodes.forEach { it.internals.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting.
|
||||
nodes.forEach {
|
||||
infos.add(it.info)
|
||||
addressesMap[it.info.chooseIdentity().name] = it.info.addresses[0]
|
||||
@ -35,7 +31,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `get nodes by owning key and by name, no network map service`() {
|
||||
fun `get nodes by owning key and by name`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netCache = alice.services.networkMapCache
|
||||
alice.database.transaction {
|
||||
@ -47,7 +43,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `get nodes by address no network map service`() {
|
||||
fun `get nodes by address`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netCache = alice.services.networkMapCache
|
||||
alice.database.transaction {
|
||||
@ -57,92 +53,20 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `restart node with DB map cache and no network map`() {
|
||||
fun `restart node with DB map cache`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val partyNodes = alice.services.networkMapCache.allNodes
|
||||
assertEquals(infos.size, partyNodes.size)
|
||||
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() {
|
||||
val parties = partiesList.subList(1, partiesList.size)
|
||||
val nodes = startNodesWithPort(parties)
|
||||
nodes.forEach {
|
||||
val partyNodes = it.services.networkMapCache.allNodes
|
||||
assertEquals(infos.size, partyNodes.size)
|
||||
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
|
||||
}
|
||||
checkConnectivity(nodes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() {
|
||||
val parties = partiesList.subList(1, partiesList.size)
|
||||
val nodes = startNodesWithPort(parties)
|
||||
nodes.forEach {
|
||||
val partyNodes = it.services.networkMapCache.allNodes
|
||||
assertEquals(infos.size, partyNodes.size)
|
||||
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
|
||||
}
|
||||
checkConnectivity(nodes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start node and network map communicate`() {
|
||||
val parties = partiesList.subList(0, 2)
|
||||
val nodes = startNodesWithPort(parties)
|
||||
checkConnectivity(nodes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start node without networkMapService and no database - success`() {
|
||||
startNode(CHARLIE.name).getOrThrow(2.seconds)
|
||||
}
|
||||
|
||||
// HELPERS
|
||||
// Helper function to restart nodes with the same host and port.
|
||||
private fun startNodesWithPort(nodesToStart: List<Party>, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
|
||||
return nodesToStart.map { party ->
|
||||
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
|
||||
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
|
||||
startNode(party.name,
|
||||
configOverrides = configOverrides,
|
||||
waitForConnection = false).getOrThrow()
|
||||
}
|
||||
}
|
||||
|
||||
// Check that nodes are functional, communicate each with each.
|
||||
private fun checkConnectivity(nodes: List<StartedNode<*>>) {
|
||||
nodes.forEach { node1 ->
|
||||
nodes.forEach { node2 ->
|
||||
if (!(node1 === node2)) { // Do not check connectivity to itself
|
||||
node2.internals.registerInitiatedFlow(SendBackFlow::class.java)
|
||||
val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendFlow(val otherParty: Party) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
logger.info("SEND FLOW to $otherParty")
|
||||
logger.info("Party key ${otherParty.owningKey.toBase58String()}")
|
||||
val session = initiateFlow(otherParty)
|
||||
return session.sendAndReceive<String>("Hi!").unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SendFlow::class)
|
||||
private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
logger.info("SEND BACK FLOW to ${otherSideSession.counterparty}")
|
||||
logger.info("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}")
|
||||
otherSideSession.send("Hello!")
|
||||
startNode(party.name, configOverrides = configOverrides)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,22 +5,20 @@ import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import net.corda.testing.internal.NodeBasedTest
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
|
||||
class FlowVersioningTest : NodeBasedTest() {
|
||||
@Test
|
||||
fun `getFlowContext returns the platform version for core flows`() {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(ALICE.name, platformVersion = 2),
|
||||
startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow()
|
||||
val alice = startNode(ALICE.name, platformVersion = 2)
|
||||
val bob = startNode(BOB.name, platformVersion = 3)
|
||||
bob.internals.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow)
|
||||
val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow(
|
||||
PretendInitiatingCoreFlow(bob.info.chooseIdentity())).resultFuture.getOrThrow()
|
||||
|
@ -28,8 +28,8 @@ import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.configureTestSSL
|
||||
import net.corda.testing.internal.NodeBasedTest
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -52,7 +52,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser).getOrThrow()
|
||||
alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser)
|
||||
attacker = createAttacker()
|
||||
startAttacker(attacker)
|
||||
}
|
||||
@ -87,7 +87,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
@Test
|
||||
fun `create queue for peer which has not been communicated with`() {
|
||||
val bob = startNode(BOB.name).getOrThrow()
|
||||
val bob = startNode(BOB.name)
|
||||
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
|
||||
}
|
||||
|
||||
@ -219,7 +219,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
private fun startBobAndCommunicateWithAlice(): Party {
|
||||
val bob = startNode(BOB.name).getOrThrow()
|
||||
val bob = startNode(BOB.name)
|
||||
bob.internals.registerInitiatedFlow(ReceiveFlow::class.java)
|
||||
val bobParty = bob.info.chooseIdentity()
|
||||
// Perform a protocol exchange to force the peer queue to be created
|
||||
|
@ -3,10 +3,8 @@ package net.corda.services.messaging
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.elapsedTime
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.randomOrNull
|
||||
import net.corda.core.internal.times
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -14,116 +12,122 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.chooseIdentity
|
||||
import net.corda.testing.driver.DriverDSLExposedInterface
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class P2PMessagingTest : NodeBasedTest() {
|
||||
class P2PMessagingTest {
|
||||
private companion object {
|
||||
val DISTRIBUTED_SERVICE_NAME = CordaX500Name(RaftValidatingNotaryService.id, "DistributedService", "London", "GB")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `network map will work after restart`() {
|
||||
val identities = listOf(DUMMY_BANK_A, DUMMY_BANK_B, DUMMY_NOTARY)
|
||||
fun startNodes() = identities.map { startNode(it.name) }.transpose()
|
||||
|
||||
val startUpDuration = elapsedTime { startNodes().getOrThrow() }
|
||||
// Start the network map a second time - this will restore message queues from the journal.
|
||||
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
|
||||
clearAllNodeInfoDb() // Clear network map data from nodes databases.
|
||||
stopAllNodes()
|
||||
startNodes().getOrThrow(timeout = startUpDuration * 3)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `communicating with a distributed service which we're part of`() {
|
||||
val distributedService = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
|
||||
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0])
|
||||
driver(startNodesInProcess = true) {
|
||||
val distributedService = startDistributedService()
|
||||
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0])
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() {
|
||||
val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
|
||||
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||
val serviceAddress = alice.services.networkMapCache.run {
|
||||
val notaryParty = notaryIdentities.randomOrNull()!!
|
||||
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
|
||||
driver(startNodesInProcess = true) {
|
||||
val distributedServiceNodes = startDistributedService()
|
||||
val alice = startAlice()
|
||||
val serviceAddress = alice.services.networkMapCache.run {
|
||||
val notaryParty = notaryIdentities.randomOrNull()!!
|
||||
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
|
||||
}
|
||||
|
||||
val dummyTopic = "dummy.topic"
|
||||
val responseMessage = "response"
|
||||
|
||||
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||
|
||||
// Send a single request with retry
|
||||
val responseFuture = with(alice.network) {
|
||||
val request = TestRequest(replyTo = myAddress)
|
||||
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
|
||||
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||
send(msg, serviceAddress, retryId = request.sessionID)
|
||||
responseFuture
|
||||
}
|
||||
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||
// The request wasn't successful.
|
||||
assertThat(responseFuture.isDone).isFalse()
|
||||
crashingNodes.ignoreRequests = false
|
||||
|
||||
// The retry should be successful.
|
||||
val response = responseFuture.getOrThrow(10.seconds)
|
||||
assertThat(response).isEqualTo(responseMessage)
|
||||
}
|
||||
|
||||
val dummyTopic = "dummy.topic"
|
||||
val responseMessage = "response"
|
||||
|
||||
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||
|
||||
// Send a single request with retry
|
||||
val responseFuture = with(alice.network) {
|
||||
val request = TestRequest(replyTo = myAddress)
|
||||
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
|
||||
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||
send(msg, serviceAddress, retryId = request.sessionID)
|
||||
responseFuture
|
||||
}
|
||||
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||
// The request wasn't successful.
|
||||
assertThat(responseFuture.isDone).isFalse()
|
||||
crashingNodes.ignoreRequests = false
|
||||
|
||||
// The retry should be successful.
|
||||
val response = responseFuture.getOrThrow(10.seconds)
|
||||
assertThat(response).isEqualTo(responseMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Fails on Team City due to issues with restaring nodes.")
|
||||
fun `distributed service request retries are persisted across client node restarts`() {
|
||||
val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
|
||||
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||
val serviceAddress = alice.services.networkMapCache.run {
|
||||
val notaryParty = notaryIdentities.randomOrNull()!!
|
||||
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
|
||||
driver(startNodesInProcess = true) {
|
||||
val distributedServiceNodes = startDistributedService()
|
||||
val alice = startAlice()
|
||||
val serviceAddress = alice.services.networkMapCache.run {
|
||||
val notaryParty = notaryIdentities.randomOrNull()!!
|
||||
alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!)
|
||||
}
|
||||
|
||||
val dummyTopic = "dummy.topic"
|
||||
val responseMessage = "response"
|
||||
|
||||
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||
|
||||
val sessionId = random63BitValue()
|
||||
|
||||
// Send a single request with retry
|
||||
with(alice.network) {
|
||||
val request = TestRequest(sessionId, myAddress)
|
||||
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||
send(msg, serviceAddress, retryId = request.sessionID)
|
||||
}
|
||||
|
||||
// Wait until the first request is received
|
||||
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||
// Stop alice's node after we ensured that the first request was delivered and ignored.
|
||||
alice.dispose()
|
||||
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
|
||||
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
|
||||
|
||||
crashingNodes.ignoreRequests = false
|
||||
|
||||
// Restart the node and expect a response
|
||||
val aliceRestarted = startAlice()
|
||||
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
|
||||
|
||||
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
|
||||
assertThat(response).isEqualTo(responseMessage)
|
||||
}
|
||||
}
|
||||
|
||||
val dummyTopic = "dummy.topic"
|
||||
val responseMessage = "response"
|
||||
private fun DriverDSLExposedInterface.startDistributedService(): List<StartedNode<Node>> {
|
||||
return startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2)
|
||||
.getOrThrow()
|
||||
.second
|
||||
.map { (it as NodeHandle.InProcess).node }
|
||||
}
|
||||
|
||||
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||
|
||||
val sessionId = random63BitValue()
|
||||
|
||||
// Send a single request with retry
|
||||
with(alice.network) {
|
||||
val request = TestRequest(sessionId, myAddress)
|
||||
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||
send(msg, serviceAddress, retryId = request.sessionID)
|
||||
}
|
||||
|
||||
// Wait until the first request is received
|
||||
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||
// Stop alice's node after we ensured that the first request was delivered and ignored.
|
||||
alice.services.networkMapCache.clearNetworkMapCache()
|
||||
|
||||
alice.dispose()
|
||||
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
|
||||
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
|
||||
|
||||
crashingNodes.ignoreRequests = false
|
||||
|
||||
// Restart the node and expect a response
|
||||
val aliceRestarted = startNode(ALICE.name, waitForConnection = true, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 5)).getOrThrow()
|
||||
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
|
||||
|
||||
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
|
||||
assertThat(response).isEqualTo(responseMessage)
|
||||
private fun DriverDSLExposedInterface.startAlice(): StartedNode<Node> {
|
||||
return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
|
||||
.map { (it as NodeHandle.InProcess).node }
|
||||
.getOrThrow()
|
||||
}
|
||||
|
||||
data class CrashingNodes(
|
||||
@ -203,4 +207,4 @@ class P2PMessagingTest : NodeBasedTest() {
|
||||
@CordaSerializable
|
||||
private data class TestRequest(override val sessionID: Long = random63BitValue(),
|
||||
override val replyTo: SingleMessageRecipient) : ServiceRequestMessage
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user