mirror of
https://github.com/corda/corda.git
synced 2025-02-01 00:45:59 +00:00
Remove TestWithInMemoryNetwork
* Replace use of TestWithInMemoryNetwork by InMemoryMessagingTests with MockNetwork * Replace use of TestWithInMemoryNetwork by TimestamperNodeServiceTest with MockNetwork * Remove reference to TestWithInMemoryNetwork from TwoPartyTradeProtocolTests
This commit is contained in:
parent
5ce7580470
commit
78076ace3b
@ -90,7 +90,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
|
||||
}
|
||||
|
||||
/** Returns a started node, optionally created by the passed factory method */
|
||||
fun createNode(withTimestamper: NodeInfo?, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
fun createNode(withTimestamper: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
advertisedServices: Set<ServiceType> = emptySet()): MockNode {
|
||||
val newNode = forcedID == -1
|
||||
val id = if (newNode) counter++ else forcedID
|
||||
|
@ -3,57 +3,24 @@
|
||||
package core.messaging
|
||||
|
||||
import core.serialization.deserialize
|
||||
import core.testing.InMemoryMessagingNetwork
|
||||
import org.junit.After
|
||||
import core.testing.MockNetwork
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFails
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
open class TestWithInMemoryNetwork {
|
||||
val nodes: MutableMap<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> = HashMap()
|
||||
lateinit var network: InMemoryMessagingNetwork
|
||||
class InMemoryMessagingTests {
|
||||
lateinit var network: MockNetwork
|
||||
|
||||
fun makeNode(inBackground: Boolean = false): Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging> {
|
||||
// The manuallyPumped = true bit means that we must call the pump method on the system in order to
|
||||
val (address, builder) = network.createNode(!inBackground)
|
||||
val node = builder.start().get()
|
||||
nodes[address] = node
|
||||
return Pair(address, node)
|
||||
init {
|
||||
// BriefLogFormatter.initVerbose()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setupNetwork() {
|
||||
network = InMemoryMessagingNetwork()
|
||||
nodes.clear()
|
||||
}
|
||||
|
||||
@After
|
||||
fun stopNetwork() {
|
||||
network.stop()
|
||||
}
|
||||
|
||||
fun pumpAll(blocking: Boolean) = network.endpoints.map { it.pump(blocking) }
|
||||
|
||||
// Keep calling "pump" in rounds until every node in the network reports that it had nothing to do
|
||||
fun <T> runNetwork(body: () -> T): T {
|
||||
val result = body()
|
||||
runNetwork()
|
||||
return result
|
||||
}
|
||||
|
||||
fun runNetwork() {
|
||||
while (pumpAll(false).any { it }) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryMessagingTests : TestWithInMemoryNetwork() {
|
||||
init {
|
||||
// BriefLogFormatter.initVerbose()
|
||||
fun setUp() {
|
||||
network = MockNetwork()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -73,46 +40,45 @@ class InMemoryMessagingTests : TestWithInMemoryNetwork() {
|
||||
|
||||
@Test
|
||||
fun basics() {
|
||||
val (addr1, node1) = makeNode()
|
||||
val (addr2, node2) = makeNode()
|
||||
val (addr3, node3) = makeNode()
|
||||
val node1 = network.createNode()
|
||||
val node2 = network.createNode()
|
||||
val node3 = network.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
var finalDelivery: Message? = null
|
||||
|
||||
with(node2) {
|
||||
addMessageHandler { msg, registration ->
|
||||
send(msg, addr3)
|
||||
node2.net.addMessageHandler { msg, registration ->
|
||||
node2.net.send(msg, node3.info.address)
|
||||
}
|
||||
}
|
||||
|
||||
with(node3) {
|
||||
addMessageHandler { msg, registration ->
|
||||
node2.net.addMessageHandler { msg, registration ->
|
||||
finalDelivery = msg
|
||||
}
|
||||
}
|
||||
|
||||
// Node 1 sends a message and it should end up in finalDelivery, after we pump each node.
|
||||
runNetwork {
|
||||
node1.send(node1.createMessage("test.topic", bits), addr2)
|
||||
}
|
||||
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
|
||||
node1.net.send(node1.net.createMessage("test.topic", bits), node2.info.address)
|
||||
|
||||
network.runNetwork(rounds = 1)
|
||||
|
||||
assertTrue(Arrays.equals(finalDelivery!!.data, bits))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun broadcast() {
|
||||
val (addr1, node1) = makeNode()
|
||||
val (addr2, node2) = makeNode()
|
||||
val (addr3, node3) = makeNode()
|
||||
val node1 = network.createNode()
|
||||
val node2 = network.createNode()
|
||||
val node3 = network.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
|
||||
var counter = 0
|
||||
listOf(node1, node2, node3).forEach { it.addMessageHandler { msg, registration -> counter++ } }
|
||||
runNetwork {
|
||||
node1.send(node2.createMessage("test.topic", bits), network.everyoneOnline)
|
||||
}
|
||||
listOf(node1, node2, node3).forEach { it.net.addMessageHandler { msg, registration -> counter++ } }
|
||||
node1.net.send(node2.net.createMessage("test.topic", bits), network.messagingNetwork.everyoneOnline)
|
||||
network.runNetwork(rounds = 1)
|
||||
assertEquals(3, counter)
|
||||
}
|
||||
|
||||
@ -121,34 +87,34 @@ class InMemoryMessagingTests : TestWithInMemoryNetwork() {
|
||||
// Test (re)delivery of messages to nodes that aren't created yet, or were stopped and then restarted.
|
||||
// The purpose of this functionality is to simulate a reliable messaging system that keeps trying until
|
||||
// messages are delivered.
|
||||
val (addr1, node1) = makeNode()
|
||||
var (addr2, node2) = makeNode()
|
||||
val node1 = network.createNode()
|
||||
var node2 = network.createNode()
|
||||
|
||||
node1.send("test.topic", addr2, "hello!")
|
||||
node2.pump(false) // No handler registered, so the message goes into a holding area.
|
||||
node1.net.send("test.topic", node2.info.address, "hello!")
|
||||
network.runNetwork(rounds = 1) // No handler registered, so the message goes into a holding area.
|
||||
var runCount = 0
|
||||
node2.addMessageHandler("test.topic") { msg, registration ->
|
||||
node2.net.addMessageHandler("test.topic") { msg, registration ->
|
||||
if (msg.data.deserialize<String>() == "hello!")
|
||||
runCount++
|
||||
}
|
||||
node2.pump(false) // Try again now the handler is registered
|
||||
network.runNetwork(rounds = 1) // Try again now the handler is registered
|
||||
assertEquals(1, runCount)
|
||||
|
||||
// Shut node2 down for a while. Node 1 keeps sending it messages though.
|
||||
node2.stop()
|
||||
|
||||
node1.send("test.topic", addr2, "are you there?")
|
||||
node1.send("test.topic", addr2, "wake up!")
|
||||
node1.net.send("test.topic", node2.info.address, "are you there?")
|
||||
node1.net.send("test.topic", node2.info.address, "wake up!")
|
||||
|
||||
// Now re-create node2 with the same address as last time, and re-register a message handler.
|
||||
// Check that the messages that were sent whilst it was gone are still there, waiting for it.
|
||||
node2 = network.createNodeWithID(true, addr2.id).start().get()
|
||||
node2.addMessageHandler("test.topic") { a, b -> runCount++ }
|
||||
assertTrue(node2.pump(false))
|
||||
node2 = network.createNode(null, node2.id)
|
||||
node2.net.addMessageHandler("test.topic") { a, b -> runCount++ }
|
||||
network.runNetwork(rounds = 1)
|
||||
assertEquals(2, runCount)
|
||||
assertTrue(node2.pump(false))
|
||||
network.runNetwork(rounds = 1)
|
||||
assertEquals(3, runCount)
|
||||
assertFalse(node2.pump(false))
|
||||
network.runNetwork(rounds = 1)
|
||||
assertEquals(3, runCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ import kotlin.test.assertTrue
|
||||
*
|
||||
* We assume that Alice and Bob already found each other via some market, and have agreed the details already.
|
||||
*/
|
||||
class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
class TwoPartyTradeProtocolTests {
|
||||
lateinit var net: MockNetwork
|
||||
|
||||
@Before
|
||||
@ -155,7 +155,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
// that Bob was waiting on before the reboot occurred.
|
||||
bobNode = net.createNode(timestamperAddr, bobAddr.id, object : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
|
||||
return object : MockNetwork.MockNode(dir, config, net, timestamperAddr, bobAddr.id) {
|
||||
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, bobAddr.id) {
|
||||
override fun initialiseStorageService(dir: Path): StorageService {
|
||||
val ss = super.initialiseStorageService(dir)
|
||||
val smMap = ss.stateMachines
|
||||
@ -184,7 +184,7 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
// of gets and puts.
|
||||
private fun makeNodeWithTracking(name: String): MockNetwork.MockNode {
|
||||
// Create a node in the mock network ...
|
||||
return net.createNode(null, nodeFactory = object : MockNetwork.Factory {
|
||||
return net.createNode(nodeFactory = object : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
|
||||
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, id) {
|
||||
// That constructs the storage service object in a customised way ...
|
||||
|
@ -3,33 +3,27 @@ package core.node
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.StateMachineManager
|
||||
import core.messaging.TestWithInMemoryNetwork
|
||||
import core.node.services.*
|
||||
import core.protocols.ProtocolLogic
|
||||
import core.serialization.serialize
|
||||
import core.testing.InMemoryMessagingNetwork
|
||||
import core.testing.MockNetworkMapCache
|
||||
import core.testutils.ALICE
|
||||
import core.testutils.ALICE_KEY
|
||||
import core.testing.MockNetwork
|
||||
import core.testutils.CASH
|
||||
import core.utilities.BriefLogFormatter
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import protocols.TimestampingProtocol
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
// TODO: Refactor this to use MockNode.
|
||||
class TimestamperNodeServiceTest {
|
||||
lateinit var network: MockNetwork
|
||||
|
||||
class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
lateinit var myMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
|
||||
lateinit var serviceMessaging: Pair<InMemoryMessagingNetwork.Handle, InMemoryMessagingNetwork.InMemoryMessaging>
|
||||
lateinit var service: NodeTimestamperService
|
||||
init {
|
||||
BriefLogFormatter.initVerbose("dlg.timestamping.request")
|
||||
}
|
||||
|
||||
val ptx = TransactionBuilder().apply {
|
||||
addInputState(StateRef(SecureHash.randomSHA256(), 0))
|
||||
@ -37,25 +31,10 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
}
|
||||
|
||||
val clock = Clock.fixed(Instant.now(), ZoneId.systemDefault())
|
||||
lateinit var mockServices: ServiceHub
|
||||
lateinit var serverKey: PublicKey
|
||||
|
||||
init {
|
||||
BriefLogFormatter.initVerbose("dlg.timestamping.request")
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
myMessaging = makeNode()
|
||||
serviceMessaging = makeNode()
|
||||
mockServices = MockServices(net = serviceMessaging.second, storage = MockStorageService())
|
||||
|
||||
val timestampingNodeID = network.setupTimestampingNode(true).first
|
||||
(mockServices.networkMapCache as MockNetworkMapCache).timestampingNodes.add(timestampingNodeID)
|
||||
serverKey = timestampingNodeID.identity.owningKey
|
||||
|
||||
// And a separate one to be tested directly, to make the unit tests a bit faster.
|
||||
service = NodeTimestamperService(serviceMessaging.second, Party("Unit test suite", ALICE), ALICE_KEY)
|
||||
network = MockNetwork()
|
||||
}
|
||||
|
||||
class TestPSM(val server: NodeInfo, val now: Instant) : ProtocolLogic<Boolean>() {
|
||||
@ -76,57 +55,73 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
|
||||
@Test
|
||||
fun successWithNetwork() {
|
||||
val psm = runNetwork {
|
||||
val smm = StateMachineManager(MockServices(net = myMessaging.second), RunOnCallerThread)
|
||||
val logName = NodeTimestamperService.TIMESTAMPING_PROTOCOL_TOPIC
|
||||
val psm = TestPSM(mockServices.networkMapCache.timestampingNodes[0], clock.instant())
|
||||
smm.add(logName, psm)
|
||||
}
|
||||
assertTrue(psm.isDone)
|
||||
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
|
||||
val logName = NodeTimestamperService.TIMESTAMPING_PROTOCOL_TOPIC
|
||||
val psm = TestPSM(timestamperNode.info, clock.instant())
|
||||
val future = timestamperNode.smm.add(logName, psm)
|
||||
|
||||
network.runNetwork()
|
||||
assertTrue(future.isDone)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun wrongCommands() {
|
||||
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
|
||||
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
|
||||
val service = timestamperNode.inNodeTimestampingService!!
|
||||
|
||||
// Zero commands is not OK.
|
||||
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), myMessaging.first, Long.MIN_VALUE))
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), timestamperNode.info.address, Long.MIN_VALUE))
|
||||
}
|
||||
// More than one command is not OK.
|
||||
assertFailsWith(TimestampingError.RequiresExactlyOneCommand::class) {
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 30.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 40.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 30.seconds), timestamperKey)
|
||||
ptx.addCommand(TimestampCommand(clock.instant(), 40.seconds), timestamperKey)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), myMessaging.first, Long.MIN_VALUE))
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), timestamperNode.info.address, Long.MIN_VALUE))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun tooEarly() {
|
||||
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
|
||||
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
|
||||
val service = timestamperNode.inNodeTimestampingService!!
|
||||
|
||||
assertFailsWith(TimestampingError.NotOnTimeException::class) {
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), timestamperKey)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), myMessaging.first, Long.MIN_VALUE))
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), timestamperNode.info.address, Long.MIN_VALUE))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun tooLate() {
|
||||
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
|
||||
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
|
||||
val service = timestamperNode.inNodeTimestampingService!!
|
||||
|
||||
assertFailsWith(TimestampingError.NotOnTimeException::class) {
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(now - 60.seconds, now - 40.seconds), timestamperKey)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), myMessaging.first, Long.MIN_VALUE))
|
||||
service.processRequest(TimestampingProtocol.Request(wtx.serialize(), timestamperNode.info.address, Long.MIN_VALUE))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun success() {
|
||||
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
|
||||
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
|
||||
val service = timestamperNode.inNodeTimestampingService!!
|
||||
|
||||
val now = clock.instant()
|
||||
ptx.addCommand(TimestampCommand(now - 20.seconds, now + 20.seconds), ALICE)
|
||||
ptx.addCommand(TimestampCommand(now - 20.seconds, now + 20.seconds), timestamperKey)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
val sig = service.processRequest(TimestampingProtocol.Request(wtx.serialize(), myMessaging.first, Long.MIN_VALUE))
|
||||
val sig = service.processRequest(TimestampingProtocol.Request(wtx.serialize(), timestamperNode.info.address, Long.MIN_VALUE))
|
||||
ptx.checkAndAddSignature(sig)
|
||||
ptx.toSignedTransaction(false).verifySignatures()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user