mirror of
https://github.com/corda/corda.git
synced 2025-05-31 06:31:08 +00:00
CORDA-649: Improving logging (#1699)
This commit is contained in:
parent
57e131d8a0
commit
c0dd8d338e
@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
@ -83,6 +84,8 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
private fun createClock(configuration: FullNodeConfiguration): Clock {
|
||||
return if (configuration.useTestClock) TestClock() else NodeClock()
|
||||
}
|
||||
|
||||
private val sameVmNodeCounter = AtomicInteger()
|
||||
}
|
||||
|
||||
override val log: Logger get() = logger
|
||||
@ -90,6 +93,8 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
|
||||
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
|
||||
|
||||
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
|
||||
|
||||
// DISCUSSION
|
||||
//
|
||||
// We use a single server thread for now, which means all message handling is serialized.
|
||||
@ -127,7 +132,7 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
//
|
||||
// The primary work done by the server thread is execution of flow logics, and related
|
||||
// serialisation/deserialisation work.
|
||||
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
|
||||
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread-$sameVmNodeNumber", 1)
|
||||
|
||||
private var messageBroker: ArtemisMessagingServer? = null
|
||||
|
||||
|
@ -157,23 +157,30 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
logger.info("Adding node with info: $node")
|
||||
synchronized(_changed) {
|
||||
val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one
|
||||
if (previousNode == null) {
|
||||
logger.info("No previous node found")
|
||||
serviceHub.database.transaction {
|
||||
updateInfoDB(node)
|
||||
changePublisher.onNext(MapChange.Added(node))
|
||||
}
|
||||
} else if (previousNode != node) {
|
||||
logger.info("Previous node was found as: $previousNode")
|
||||
serviceHub.database.transaction {
|
||||
updateInfoDB(node)
|
||||
changePublisher.onNext(MapChange.Modified(node, previousNode))
|
||||
}
|
||||
} else {
|
||||
logger.info("Previous node was identical to incoming one - doing nothing")
|
||||
}
|
||||
}
|
||||
logger.info("Done adding node with info: $node")
|
||||
}
|
||||
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
logger.info("Removing node with info: $node")
|
||||
synchronized(_changed) {
|
||||
registeredNodes.remove(node.legalIdentities.first().owningKey)
|
||||
serviceHub.database.transaction {
|
||||
@ -181,6 +188,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
changePublisher.onNext(MapChange.Removed(node))
|
||||
}
|
||||
}
|
||||
logger.info("Done removing node with info: $node")
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -25,6 +25,10 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
|
||||
val infos: MutableSet<NodeInfo> = HashSet()
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<PersistentNetworkMapCacheTest>()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val nodes = startNodesWithPort(partiesList)
|
||||
@ -127,10 +131,13 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() })
|
||||
}
|
||||
charlie.internals.nodeReadyFuture.get() // Finish registration.
|
||||
logger.info("Checking connectivity")
|
||||
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
|
||||
logger.info("Loading caches")
|
||||
val cacheA = otherNodes[0].services.networkMapCache.allNodes
|
||||
val cacheB = otherNodes[1].services.networkMapCache.allNodes
|
||||
val cacheC = charlie.services.networkMapCache.allNodes
|
||||
logger.info("Performing verification")
|
||||
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
|
||||
assertThat(cacheB).contains(charlie.info)
|
||||
assertEquals(cacheA.toSet(), cacheB.toSet())
|
||||
@ -158,9 +165,11 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
private fun checkConnectivity(nodes: List<StartedNode<*>>) {
|
||||
nodes.forEach { node1 ->
|
||||
nodes.forEach { node2 ->
|
||||
node2.internals.registerInitiatedFlow(SendBackFlow::class.java)
|
||||
val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
|
||||
if(!(node1 === node2)) { // Do not check connectivity to itself
|
||||
node2.internals.registerInitiatedFlow(SendBackFlow::class.java)
|
||||
val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -169,8 +178,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
private class SendFlow(val otherParty: Party) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
println("SEND FLOW to $otherParty")
|
||||
println("Party key ${otherParty.owningKey.toBase58String()}")
|
||||
logger.info("SEND FLOW to $otherParty")
|
||||
logger.info("Party key ${otherParty.owningKey.toBase58String()}")
|
||||
val session = initiateFlow(otherParty)
|
||||
return session.sendAndReceive<String>("Hi!").unwrap { it }
|
||||
}
|
||||
@ -180,8 +189,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
println("SEND BACK FLOW to ${otherSideSession.counterparty}")
|
||||
println("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}")
|
||||
logger.info("SEND BACK FLOW to ${otherSideSession.counterparty}")
|
||||
logger.info("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}")
|
||||
otherSideSession.send("Hello!")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user