mirror of
https://github.com/corda/corda.git
synced 2024-12-22 06:17:55 +00:00
Testing: make the in memory messaging service support logging of all sent messages with useful text descriptions of each node
This commit is contained in:
parent
c5217412a4
commit
883be19978
@ -7,6 +7,8 @@ import core.ThreadBox
|
|||||||
import core.crypto.sha256
|
import core.crypto.sha256
|
||||||
import core.messaging.*
|
import core.messaging.*
|
||||||
import core.utilities.loggerFor
|
import core.utilities.loggerFor
|
||||||
|
import core.utilities.trace
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
@ -27,6 +29,11 @@ import kotlin.concurrent.thread
|
|||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class InMemoryMessagingNetwork {
|
class InMemoryMessagingNetwork {
|
||||||
|
companion object {
|
||||||
|
val MESSAGES_LOG_NAME = "messages"
|
||||||
|
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
private var counter = 0 // -1 means stopped.
|
private var counter = 0 // -1 means stopped.
|
||||||
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
|
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
|
||||||
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
|
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
|
||||||
@ -55,9 +62,15 @@ class InMemoryMessagingNetwork {
|
|||||||
return Pair(id, builder)
|
return Pair(id, builder)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
|
/**
|
||||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<InMemoryMessaging> {
|
* Creates a node at the given address: useful if you want to recreate a node to simulate a restart.
|
||||||
return Builder(manuallyPumped, Handle(id))
|
*
|
||||||
|
* @param manuallyPumped see [createNode]
|
||||||
|
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
|
||||||
|
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate
|
||||||
|
*/
|
||||||
|
fun createNodeWithID(manuallyPumped: Boolean, id: Int, description: String? = null): MessagingServiceBuilder<InMemoryMessaging> {
|
||||||
|
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _allMessages = PublishSubject.create<Triple<SingleMessageRecipient, Message, MessageRecipients>>()
|
private val _allMessages = PublishSubject.create<Triple<SingleMessageRecipient, Message, MessageRecipients>>()
|
||||||
@ -74,6 +87,7 @@ class InMemoryMessagingNetwork {
|
|||||||
|
|
||||||
@Synchronized
|
@Synchronized
|
||||||
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
|
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
|
||||||
|
log.trace { "${message.topic} from '${from.myAddress}' to '$recipients'" }
|
||||||
val calc = latencyCalculator
|
val calc = latencyCalculator
|
||||||
if (calc != null && recipients is SingleMessageRecipient) {
|
if (calc != null && recipients is SingleMessageRecipient) {
|
||||||
// Inject some artificial latency.
|
// Inject some artificial latency.
|
||||||
@ -133,8 +147,8 @@ class InMemoryMessagingNetwork {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Handle(val id: Int) : SingleMessageRecipient {
|
class Handle(val id: Int, val description: String) : SingleMessageRecipient {
|
||||||
override fun toString() = "In memory node $id"
|
override fun toString() = description
|
||||||
override fun equals(other: Any?) = other is Handle && other.id == id
|
override fun equals(other: Any?) = other is Handle && other.id == id
|
||||||
override fun hashCode() = id.hashCode()
|
override fun hashCode() = id.hashCode()
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
|
|||||||
|
|
||||||
override fun makeMessagingService(): MessagingService {
|
override fun makeMessagingService(): MessagingService {
|
||||||
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
|
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
|
||||||
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id).start().get()
|
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||||
|
Loading…
Reference in New Issue
Block a user