diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 17660cfe37..efb817f54a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory import java.io.IOException import java.time.Clock import java.util.* +import java.util.concurrent.atomic.AtomicInteger import javax.management.ObjectName import kotlin.system.exitProcess @@ -83,6 +84,8 @@ open class Node(override val configuration: FullNodeConfiguration, private fun createClock(configuration: FullNodeConfiguration): Clock { return if (configuration.useTestClock) TestClock() else NodeClock() } + + private val sameVmNodeCounter = AtomicInteger() } override val log: Logger get() = logger @@ -90,6 +93,8 @@ open class Node(override val configuration: FullNodeConfiguration, override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress) override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService + private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1" + // DISCUSSION // // We use a single server thread for now, which means all message handling is serialized. @@ -127,7 +132,7 @@ open class Node(override val configuration: FullNodeConfiguration, // // The primary work done by the server thread is execution of flow logics, and related // serialisation/deserialisation work. - override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) + override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread-$sameVmNodeNumber", 1) private var messageBroker: ArtemisMessagingServer? = null diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 45e2b6f9b0..1105c4981c 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -157,23 +157,30 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) } override fun addNode(node: NodeInfo) { + logger.info("Adding node with info: $node") synchronized(_changed) { val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one if (previousNode == null) { + logger.info("No previous node found") serviceHub.database.transaction { updateInfoDB(node) changePublisher.onNext(MapChange.Added(node)) } } else if (previousNode != node) { + logger.info("Previous node was found as: $previousNode") serviceHub.database.transaction { updateInfoDB(node) changePublisher.onNext(MapChange.Modified(node, previousNode)) } + } else { + logger.info("Previous node was identical to incoming one - doing nothing") } } + logger.info("Done adding node with info: $node") } override fun removeNode(node: NodeInfo) { + logger.info("Removing node with info: $node") synchronized(_changed) { registeredNodes.remove(node.legalIdentities.first().owningKey) serviceHub.database.transaction { @@ -181,6 +188,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) changePublisher.onNext(MapChange.Removed(node)) } } + logger.info("Done removing node with info: $node") } /** diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index da94d376aa..0d34b7d142 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -25,6 +25,10 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { val addressesMap: HashMap = HashMap() val infos: MutableSet = HashSet() + companion object { + val logger = loggerFor() + } + @Before fun start() { val nodes = startNodesWithPort(partiesList) @@ -127,10 +131,13 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() }) } charlie.internals.nodeReadyFuture.get() // Finish registration. + logger.info("Checking connectivity") checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. + logger.info("Loading caches") val cacheA = otherNodes[0].services.networkMapCache.allNodes val cacheB = otherNodes[1].services.networkMapCache.allNodes val cacheC = charlie.services.networkMapCache.allNodes + logger.info("Performing verification") assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap assertThat(cacheB).contains(charlie.info) assertEquals(cacheA.toSet(), cacheB.toSet()) @@ -158,9 +165,11 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { private fun checkConnectivity(nodes: List>) { nodes.forEach { node1 -> nodes.forEach { node2 -> - node2.internals.registerInitiatedFlow(SendBackFlow::class.java) - val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture - assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") + if(!(node1 === node2)) { // Do not check connectivity to itself + node2.internals.registerInitiatedFlow(SendBackFlow::class.java) + val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture + assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") + } } } } @@ -169,8 +178,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { private class SendFlow(val otherParty: Party) : FlowLogic() { @Suspendable override fun call(): String { - println("SEND FLOW to $otherParty") - println("Party key ${otherParty.owningKey.toBase58String()}") + logger.info("SEND FLOW to $otherParty") + logger.info("Party key ${otherParty.owningKey.toBase58String()}") val session = initiateFlow(otherParty) return session.sendAndReceive("Hi!").unwrap { it } } @@ -180,8 +189,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic() { @Suspendable override fun call() { - println("SEND BACK FLOW to ${otherSideSession.counterparty}") - println("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}") + logger.info("SEND BACK FLOW to ${otherSideSession.counterparty}") + logger.info("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}") otherSideSession.send("Hello!") } }