mirror of
https://github.com/corda/corda.git
synced 2025-01-31 00:24:59 +00:00
Minor: rename InMemoryNetwork/Node to talk more about messaging, to deconflict with the new mock node (which ties together all the various node services).
This commit is contained in:
parent
6bdfbb2a4f
commit
5bd8a3408b
@ -45,7 +45,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
|
||||
// We will run as much stuff in this thread as possible to keep the risk of thread safety bugs low during the
|
||||
// low-performance prototyping period.
|
||||
val serverThread = Executors.newSingleThreadExecutor()
|
||||
protected open val serverThread = Executors.newSingleThreadExecutor()
|
||||
|
||||
val services = object : ServiceHub {
|
||||
override val networkService: MessagingService get() = net
|
||||
@ -56,6 +56,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
override val identityService: IdentityService get() = identity
|
||||
}
|
||||
|
||||
val legallyIdentifableAddress: LegallyIdentifiableNode get() = LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
|
||||
|
||||
// TODO: This will be obsoleted by "PLT-12: Basic module/sandbox system for contracts"
|
||||
protected val contractFactory = object : ContractFactory {
|
||||
private val contracts = mapOf(
|
||||
@ -113,6 +115,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
return this
|
||||
}
|
||||
|
||||
open fun stop() {
|
||||
net.stop()
|
||||
serverThread.shutdownNow()
|
||||
}
|
||||
|
||||
protected abstract fun makeMessagingService(): MessagingService
|
||||
|
||||
protected fun makeStorageService(dir: Path): StorageService {
|
||||
|
@ -73,10 +73,9 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
|
||||
return this
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
override fun stop() {
|
||||
webServer.stop()
|
||||
net.stop()
|
||||
serverThread.shutdownNow()
|
||||
super.stop()
|
||||
nodeFileLock!!.release()
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,10 @@ package core.node
|
||||
|
||||
import java.util.*
|
||||
|
||||
interface NodeConfiguration {
|
||||
val myLegalName: String
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple wrapper around a plain old Java .properties file. The keys have the same name as in the source code.
|
||||
*
|
||||
@ -17,6 +21,6 @@ import java.util.*
|
||||
* We want to be able to configure via a GUI too, so an ability to round-trip whitespace, comments etc when machine
|
||||
* editing the file is a must-have.
|
||||
*/
|
||||
class NodeConfiguration(private val properties: Properties) {
|
||||
val myLegalName: String by properties
|
||||
class NodeConfigurationFromProperties(private val properties: Properties) : NodeConfiguration {
|
||||
override val myLegalName: String by properties
|
||||
}
|
||||
|
@ -251,7 +251,7 @@ private fun loadConfigFile(configFile: Path): NodeConfiguration {
|
||||
Properties().apply { load(it) }
|
||||
}
|
||||
|
||||
val config = NodeConfiguration(configProps)
|
||||
val config = NodeConfigurationFromProperties(configProps)
|
||||
|
||||
// Make sure admin did actually edit at least the legal name.
|
||||
if (config.myLegalName == defaultLegalName)
|
||||
|
@ -24,35 +24,35 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* An in-memory network allows you to manufacture [InMemoryNode]s for a set of participants. Each
|
||||
* [InMemoryNode] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
|
||||
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
|
||||
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
|
||||
* testing).
|
||||
*/
|
||||
@ThreadSafe
|
||||
class InMemoryNetwork {
|
||||
class InMemoryMessagingNetwork {
|
||||
private var counter = 0 // -1 means stopped.
|
||||
private val handleNodeMap = HashMap<Handle, InMemoryNode>()
|
||||
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
|
||||
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
|
||||
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
|
||||
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
|
||||
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
|
||||
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<Message>>()
|
||||
|
||||
val nodes: List<InMemoryNode> @Synchronized get() = handleNodeMap.values.toList()
|
||||
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
|
||||
|
||||
/**
|
||||
* Creates a node and returns the new object that identifies its location on the network to senders, and the
|
||||
* [InMemoryNode] that the recipient/in-memory node uses to receive messages and send messages itself.
|
||||
* [InMemoryMessaging] that the recipient/in-memory node uses to receive messages and send messages itself.
|
||||
*
|
||||
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryNode.pump] method on the [InMemoryNode]
|
||||
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryMessaging.pump] method on the [InMemoryMessaging]
|
||||
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
|
||||
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
|
||||
* executor.
|
||||
*/
|
||||
@Synchronized
|
||||
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryNode>> {
|
||||
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
check(counter >= 0) { "In memory network stopped: please recreate."}
|
||||
val builder = createNodeWithID(manuallyPumped, counter) as Builder
|
||||
counter++
|
||||
@ -61,19 +61,19 @@ class InMemoryNetwork {
|
||||
}
|
||||
|
||||
/** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
|
||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<InMemoryNode> {
|
||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<InMemoryMessaging> {
|
||||
return Builder(manuallyPumped, Handle(id))
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
private fun netSend(message: Message, recipients: MessageRecipients) {
|
||||
private fun msgSend(message: Message, recipients: MessageRecipients) {
|
||||
when (recipients) {
|
||||
is Handle -> getQueueForHandle(recipients).add(message)
|
||||
|
||||
is AllPossibleRecipients -> {
|
||||
// This means all possible recipients _that the network knows about at the time_, not literally everyone
|
||||
// who joins into the indefinite future.
|
||||
for (handle in handleNodeMap.keys)
|
||||
for (handle in handleEndpointMap.keys)
|
||||
getQueueForHandle(handle).add(message)
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unknown type of recipient handle")
|
||||
@ -82,7 +82,7 @@ class InMemoryNetwork {
|
||||
|
||||
@Synchronized
|
||||
private fun netNodeHasShutdown(handle: Handle) {
|
||||
handleNodeMap.remove(handle)
|
||||
handleEndpointMap.remove(handle)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
@ -93,21 +93,21 @@ class InMemoryNetwork {
|
||||
fun stop() {
|
||||
val nodes = synchronized(this) {
|
||||
counter = -1
|
||||
handleNodeMap.values.toList()
|
||||
handleEndpointMap.values.toList()
|
||||
}
|
||||
|
||||
for (node in nodes)
|
||||
node.stop()
|
||||
|
||||
handleNodeMap.clear()
|
||||
handleEndpointMap.clear()
|
||||
messageQueues.clear()
|
||||
}
|
||||
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryNode> {
|
||||
override fun start(): ListenableFuture<InMemoryNode> {
|
||||
synchronized(this@InMemoryNetwork) {
|
||||
val node = InMemoryNode(manuallyPumped, id)
|
||||
handleNodeMap[id] = node
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
override fun start(): ListenableFuture<InMemoryMessaging> {
|
||||
synchronized(this@InMemoryMessagingNetwork) {
|
||||
val node = InMemoryMessaging(manuallyPumped, id)
|
||||
handleEndpointMap[id] = node
|
||||
return Futures.immediateFuture(node)
|
||||
}
|
||||
}
|
||||
@ -122,7 +122,7 @@ class InMemoryNetwork {
|
||||
private var timestampingAdvert: LegallyIdentifiableNode? = null
|
||||
|
||||
@Synchronized
|
||||
fun setupTimestampingNode(manuallyPumped: Boolean): Pair<LegallyIdentifiableNode, InMemoryNode> {
|
||||
fun setupTimestampingNode(manuallyPumped: Boolean): Pair<LegallyIdentifiableNode, InMemoryMessaging> {
|
||||
check(timestampingAdvert == null)
|
||||
val (handle, builder) = createNode(manuallyPumped)
|
||||
val node = builder.start().get()
|
||||
@ -132,14 +132,14 @@ class InMemoryNetwork {
|
||||
}
|
||||
|
||||
/**
|
||||
* An [InMemoryNode] provides a [MessagingService] that isn't backed by any kind of network or disk storage
|
||||
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
|
||||
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
|
||||
* when all entities on 'the network' are being simulated in-process.
|
||||
*
|
||||
* An instance can be obtained by creating a builder and then using the start method.
|
||||
*/
|
||||
@ThreadSafe
|
||||
inner class InMemoryNode(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
|
||||
inner class Handler(val executor: Executor?, val topic: String,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
@Volatile
|
||||
@ -172,7 +172,7 @@ class InMemoryNetwork {
|
||||
Pair(handler, items)
|
||||
}
|
||||
for (it in items)
|
||||
netSend(it, handle)
|
||||
msgSend(it, handle)
|
||||
return handler
|
||||
}
|
||||
|
||||
@ -183,7 +183,7 @@ class InMemoryNetwork {
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
check(running)
|
||||
netSend(message, target)
|
||||
msgSend(message, target)
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
@ -245,7 +245,7 @@ class InMemoryNetwork {
|
||||
try {
|
||||
handler.callback(message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
|
||||
}
|
||||
}
|
||||
}
|
@ -21,10 +21,10 @@ import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
open class TestWithInMemoryNetwork {
|
||||
val nodes: MutableMap<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode> = HashMap()
|
||||
lateinit var network: InMemoryNetwork
|
||||
val nodes: MutableMap<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> = HashMap()
|
||||
lateinit var network: InMemoryMessagingNetwork
|
||||
|
||||
fun makeNode(inBackground: Boolean = false): Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode> {
|
||||
fun makeNode(inBackground: Boolean = false): Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> {
|
||||
// The manuallyPumped = true bit means that we must call the pump method on the system in order to
|
||||
val (address, builder) = network.createNode(!inBackground)
|
||||
val node = builder.start().get()
|
||||
@ -34,7 +34,7 @@ open class TestWithInMemoryNetwork {
|
||||
|
||||
@Before
|
||||
fun setupNetwork() {
|
||||
network = InMemoryNetwork()
|
||||
network = InMemoryMessagingNetwork()
|
||||
nodes.clear()
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ open class TestWithInMemoryNetwork {
|
||||
network.stop()
|
||||
}
|
||||
|
||||
fun pumpAll(blocking: Boolean) = network.nodes.map { it.pump(blocking) }
|
||||
fun pumpAll(blocking: Boolean) = network.endpoints.map { it.pump(blocking) }
|
||||
|
||||
// Keep calling "pump" in rounds until every node in the network reports that it had nothing to do
|
||||
fun <T> runNetwork(body: () -> T): T {
|
||||
|
@ -28,8 +28,8 @@ import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
lateinit var myNode: Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode>
|
||||
lateinit var serviceNode: Pair<InMemoryNetwork.Handle, InMemoryNetwork.InMemoryNode>
|
||||
lateinit var myMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
|
||||
lateinit var serviceMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
|
||||
lateinit var service: TimestamperNodeService
|
||||
|
||||
val ptx = TransactionBuilder().apply {
|
||||
@ -47,16 +47,16 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
myNode = makeNode()
|
||||
serviceNode = makeNode()
|
||||
mockServices = MockServices(net = serviceNode.second, storage = MockStorageService())
|
||||
myMessaging = makeNode()
|
||||
serviceMessaging = makeNode()
|
||||
mockServices = MockServices(net = serviceMessaging.second, storage = MockStorageService())
|
||||
|
||||
val timestampingNodeID = network.setupTimestampingNode(true).first
|
||||
(mockServices.networkMapService as MockNetworkMap).timestampingNodes.add(timestampingNodeID)
|
||||
serverKey = timestampingNodeID.identity.owningKey
|
||||
|
||||
// And a separate one to be tested directly, to make the unit tests a bit faster.
|
||||
service = TimestamperNodeService(serviceNode.second, Party("Unit test suite", ALICE), ALICE_KEY)
|
||||
service = TimestamperNodeService(serviceMessaging.second, Party("Unit test suite", ALICE), ALICE_KEY)
|
||||
}
|
||||
|
||||
class TestPSM(val server: LegallyIdentifiableNode, val now: Instant) : ProtocolLogic<Boolean>() {
|
||||
@ -78,7 +78,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
@Test
|
||||
fun successWithNetwork() {
|
||||
val psm = runNetwork {
|
||||
val smm = StateMachineManager(MockServices(net = myNode.second), RunOnCallerThread)
|
||||
val smm = StateMachineManager(MockServices(net = myMessaging.second), RunOnCallerThread)
|
||||
val logName = TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC
|
||||
val psm = TestPSM(mockServices.networkMapService.timestampingNodes[0], clock.instant())
|
||||
smm.add(logName, psm)
|
||||
@ -91,14 +91,14 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
// Zero commands is not OK.
|
||||
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
|
||||
}
|
||||
// More than one command is not OK.
|
||||
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 30.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 40.seconds), ALICE)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,7 +118,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
|
||||
service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 20.seconds, now + 20.seconds), ALICE)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
val sig = service.processRequest(TimestampingMessages.Request(wtx.serialize(), myNode.first, "ignored"))
|
||||
val sig = service.processRequest(TimestampingMessages.Request(wtx.serialize(), myMessaging.first, "ignored"))
|
||||
ptx.checkAndAddSignature(sig)
|
||||
ptx.toSignedTransaction(false).verifySignatures()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user