The integration tests for demos now spawn a new JVM instead of using threads. Demos no longer need or contain any in memory node logic.

This commit is contained in:
Clinton Alexander 2016-06-09 16:42:06 +01:00 committed by Andras Slemmer
parent 22dd36950c
commit 03e2852880
6 changed files with 81 additions and 171 deletions

View File

@ -1,8 +1,5 @@
package com.r3corda.core.testing package com.r3corda.core.testing
import com.r3corda.demos.DemoConfig
import com.r3corda.demos.runIRSDemo
import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
import org.junit.Test import org.junit.Test
import java.nio.file.Path import java.nio.file.Path
@ -12,51 +9,53 @@ class IRSDemoTest {
@Test fun `runs IRS demo`() { @Test fun `runs IRS demo`() {
val dirA = Paths.get("./nodeA") val dirA = Paths.get("./nodeA")
val dirB = Paths.get("./nodeB") val dirB = Paths.get("./nodeB")
var procA: Process? = null
var procB: Process? = null
try { try {
setupNode(dirA, "NodeA") setupNode(dirA, "NodeA")
setupNode(dirB, "NodeB") setupNode(dirB, "NodeB")
val threadA = startNode(dirA, "NodeA") procA = startNode(dirA, "NodeA")
val threadB = startNode(dirB, "NodeB") procB = startNode(dirB, "NodeB")
runTrade() runTrade()
runDateChange() runDateChange()
stopNode(threadA)
stopNode(threadB)
} finally { } finally {
stopNode(procA)
stopNode(procB)
cleanup(dirA) cleanup(dirA)
cleanup(dirB) cleanup(dirB)
} }
} }
} }
private fun setupNode(dir: Path, nodeType: String) { private fun setupNode(dir: Path, nodeType: String) {
runIRSDemo(arrayOf("--role", "Setup" + nodeType, "--dir", dir.toString())) val args = listOf("--role", "Setup" + nodeType, "--dir", dir.toString())
val proc = spawn("com.r3corda.demos.IRSDemoKt", args)
proc.waitFor();
assertEquals(proc.exitValue(), 0)
} }
private fun startNode(dir: Path, nodeType: String): Thread { private fun startNode(dir: Path, nodeType: String): Process {
val config = DemoConfig(true) val args = listOf("--role", nodeType, "--dir", dir.toString())
val nodeThread = thread(true, false, null, nodeType, -1, { val proc = spawn("com.r3corda.demos.IRSDemoKt", args)
try { Thread.sleep(15000)
runIRSDemo(arrayOf("--role", nodeType, "--dir", dir.toString()), config) return proc
} finally {
// Will only reach here during error or after node is stopped, so ensure lock is unlocked.
config.nodeReady.countDown()
}
})
config.nodeReady.await()
return nodeThread
} }
private fun runTrade() { private fun runTrade() {
assertEquals(runIRSDemo(arrayOf("--role", "Trade", "trade1")), 0) val args = listOf("--role", "Trade", "trade1")
val proc = spawn("com.r3corda.demos.IRSDemoKt", args)
proc.waitFor();
assertEquals(proc.exitValue(), 0)
} }
private fun runDateChange() { private fun runDateChange() {
assertEquals(runIRSDemo(arrayOf("--role", "Date", "2017-01-02")), 0) val args = listOf("--role", "Date", "2017-01-02")
val proc = spawn("com.r3corda.demos.IRSDemoKt", args)
proc.waitFor();
assertEquals(proc.exitValue(), 0)
} }
private fun stopNode(nodeThread: Thread) { private fun stopNode(nodeProc: Process?) {
// The demo is designed to exit on interrupt nodeProc?.destroy()
nodeThread.interrupt()
} }
private fun cleanup(dir: Path) { private fun cleanup(dir: Path) {

View File

@ -0,0 +1,15 @@
package com.r3corda.core.testing
import java.nio.file.Paths
fun spawn(className: String, args: List<String>): Process {
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val javaArgs = listOf(path, "-javaagent:lib/quasar.jar", "-cp", classpath, className)
val builder = ProcessBuilder(javaArgs + args)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()
val process = builder.start();
return process
}

View File

@ -1,39 +1,34 @@
package com.r3corda.core.testing package com.r3corda.core.testing
import com.r3corda.demos.DemoConfig
import com.r3corda.demos.runTraderDemo
import org.junit.Test import org.junit.Test
import java.nio.file.Paths import java.nio.file.Paths
import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
class TraderDemoTest { class TraderDemoTest {
@Test fun `runs trader demo`() { @Test fun `runs trader demo`() {
var nodeProc: Process? = null
try { try {
runBuyer() nodeProc = runBuyer()
runSeller() runSeller()
} finally { } finally {
nodeProc?.destroy()
cleanup() cleanup()
} }
} }
} }
private fun runBuyer() { private fun runBuyer(): Process {
val config = DemoConfig(true) val args = listOf("--role", "BUYER")
thread(true, false, null, "Buyer", -1, { val proc = spawn("com.r3corda.demos.TraderDemoKt", args)
try { Thread.sleep(15000)
runTraderDemo(arrayOf("--role", "BUYER"), config) return proc
} finally {
// Will only reach here during error or after node is stopped, so ensure lock is unlocked.
config.nodeReady.countDown()
}
})
config.nodeReady.await()
} }
private fun runSeller() { private fun runSeller() {
val config = DemoConfig(true) val args = listOf("--role", "SELLER")
assertEquals(runTraderDemo(arrayOf("--role", "SELLER"), config), 0) val proc = spawn("com.r3corda.demos.TraderDemoKt", args)
proc.waitFor();
assertEquals(proc.exitValue(), 0)
} }
private fun cleanup() { private fun cleanup() {

View File

@ -1,33 +0,0 @@
package com.r3corda.demos
import com.google.common.net.HostAndPort
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.internal.Node
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import java.nio.file.Path
import java.time.Clock
import java.util.concurrent.CountDownLatch
val messageNetwork = InMemoryMessagingNetwork()
class DemoNode(messagingService: MessagingService, dir: Path, p2pAddr: HostAndPort, config: NodeConfiguration,
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = NodeClock(), clientAPIs: List<Class<*>> = listOf())
: Node(dir, p2pAddr, config, networkMapAddress, advertisedServices, clock, clientAPIs) {
val messagingService = messagingService
override fun makeMessagingService(): MessagingService {
return messagingService
}
override fun startMessagingService() = Unit
}
class DemoConfig(useInMemoryMessaging: Boolean = false) {
val inMemory = useInMemoryMessaging
val nodeReady = CountDownLatch(1)
}

View File

@ -89,7 +89,7 @@ fun main(args: Array<String>) {
exitProcess(runIRSDemo(args)) exitProcess(runIRSDemo(args))
} }
fun runIRSDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()): Int { fun runIRSDemo(args: Array<String>): Int {
val parser = OptionParser() val parser = OptionParser()
val demoArgs = setupArgs(parser) val demoArgs = setupArgs(parser)
val options = try { val options = try {
@ -107,8 +107,8 @@ fun runIRSDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()):
return when (role) { return when (role) {
IRSDemoRole.SetupNodeA -> setup(configureNodeParams(IRSDemoRole.NodeA, demoArgs, options)) IRSDemoRole.SetupNodeA -> setup(configureNodeParams(IRSDemoRole.NodeA, demoArgs, options))
IRSDemoRole.SetupNodeB -> setup(configureNodeParams(IRSDemoRole.NodeB, demoArgs, options)) IRSDemoRole.SetupNodeB -> setup(configureNodeParams(IRSDemoRole.NodeB, demoArgs, options))
IRSDemoRole.NodeA -> runNode(role, demoArgs, options, demoNodeConfig) IRSDemoRole.NodeA -> runNode(role, demoArgs, options)
IRSDemoRole.NodeB -> runNode(role, demoArgs, options, demoNodeConfig) IRSDemoRole.NodeB -> runNode(role, demoArgs, options)
IRSDemoRole.Trade -> runTrade(demoArgs, options) IRSDemoRole.Trade -> runTrade(demoArgs, options)
IRSDemoRole.Date -> runDateChange(demoArgs, options) IRSDemoRole.Date -> runDateChange(demoArgs, options)
} }
@ -156,7 +156,7 @@ private fun runDateChange(demoArgs: DemoArgs, options: OptionSet): Int {
return 0 return 0
} }
private fun runNode(role: IRSDemoRole, demoArgs: DemoArgs, options: OptionSet, demoNodeConfig: DemoConfig): Int { private fun runNode(role: IRSDemoRole, demoArgs: DemoArgs, options: OptionSet): Int {
// If these directory and identity file arguments aren't specified then we can assume a default setup and // If these directory and identity file arguments aren't specified then we can assume a default setup and
// create everything that is needed without needing to run setup. // create everything that is needed without needing to run setup.
if(!options.has(demoArgs.dirArg) && !options.has(demoArgs.fakeTradeWithIdentityFile)) { if(!options.has(demoArgs.dirArg) && !options.has(demoArgs.fakeTradeWithIdentityFile)) {
@ -165,7 +165,7 @@ private fun runNode(role: IRSDemoRole, demoArgs: DemoArgs, options: OptionSet, d
} }
try { try {
runNode(configureNodeParams(role, demoArgs, options), demoNodeConfig) runNode(configureNodeParams(role, demoArgs, options))
} catch (e: NotSetupException) { } catch (e: NotSetupException) {
println(e.message) println(e.message)
return 1 return 1
@ -252,13 +252,13 @@ private fun configureNodeParams(role: IRSDemoRole, args: DemoArgs, options: Opti
return nodeParams return nodeParams
} }
private fun runNode(nodeParams: NodeParams, demoNodeConfig: DemoConfig) : Unit { private fun runNode(nodeParams: NodeParams) : Unit {
val networkMap = createRecipient(nodeParams.mapAddress, demoNodeConfig.inMemory) val networkMap = createRecipient(nodeParams.mapAddress)
val destinations = nodeParams.tradeWithAddrs.map({ val destinations = nodeParams.tradeWithAddrs.map({
createRecipient(it, demoNodeConfig.inMemory) createRecipient(it)
}) })
val node = startNode(nodeParams, networkMap, destinations, demoNodeConfig.inMemory) val node = startNode(nodeParams, networkMap, destinations)
// Register handlers for the demo // Register handlers for the demo
AutoOfferProtocol.Handler.register(node) AutoOfferProtocol.Handler.register(node)
UpdateBusinessDayProtocol.Handler.register(node) UpdateBusinessDayProtocol.Handler.register(node)
@ -268,7 +268,6 @@ private fun runNode(nodeParams: NodeParams, demoNodeConfig: DemoConfig) : Unit {
runUploadRates("http://localhost:31341") runUploadRates("http://localhost:31341")
} }
demoNodeConfig.nodeReady.countDown()
try { try {
while (true) Thread.sleep(Long.MAX_VALUE) while (true) Thread.sleep(Long.MAX_VALUE)
} catch(e: InterruptedException) { } catch(e: InterruptedException) {
@ -276,18 +275,12 @@ private fun runNode(nodeParams: NodeParams, demoNodeConfig: DemoConfig) : Unit {
} }
} }
private fun createRecipient(addr: String, inMemory: Boolean) : SingleMessageRecipient { private fun createRecipient(addr: String) : SingleMessageRecipient {
val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT)
return if(inMemory) { return ArtemisMessagingService.makeRecipient(hostAndPort)
// Assumption here is that all nodes run in memory and thus cannot share a port number.
val id = hostAndPort.port
InMemoryMessagingNetwork.Handle(id, "Node " + id)
} else {
ArtemisMessagingService.makeRecipient(hostAndPort)
}
} }
private fun startNode(params : NodeParams, networkMap: SingleMessageRecipient, recipients: List<SingleMessageRecipient>, inMemory: Boolean) : Node { private fun startNode(params : NodeParams, networkMap: SingleMessageRecipient, recipients: List<SingleMessageRecipient>) : Node {
val config = getNodeConfig(params) val config = getNodeConfig(params)
val advertisedServices: Set<ServiceType> val advertisedServices: Set<ServiceType>
val myNetAddr = HostAndPort.fromString(params.address).withDefaultPort(Node.DEFAULT_PORT) val myNetAddr = HostAndPort.fromString(params.address).withDefaultPort(Node.DEFAULT_PORT)
@ -300,17 +293,9 @@ private fun startNode(params : NodeParams, networkMap: SingleMessageRecipient, r
nodeInfo(networkMap, params.identityFile, setOf(NetworkMapService.Type, SimpleNotaryService.Type)) nodeInfo(networkMap, params.identityFile, setOf(NetworkMapService.Type, SimpleNotaryService.Type))
} }
val node = if(inMemory) { val node = logElapsedTime("Node startup") { Node(params.dir, myNetAddr, config, networkMapId,
// Port is ID for in memory since we assume in memory is all on the same machine, thus ports are unique. advertisedServices, DemoClock(),
val messageService = messageNetwork.createNodeWithID(false, myNetAddr.port).start().get() listOf(InterestRateSwapAPI::class.java)).start() }
logElapsedTime("Node startup") { DemoNode(messageService, params.dir, myNetAddr, config, networkMapId,
advertisedServices, DemoClock(),
listOf(InterestRateSwapAPI::class.java)).start() }
} else {
logElapsedTime("Node startup") { Node(params.dir, myNetAddr, config, networkMapId,
advertisedServices, DemoClock(),
listOf(InterestRateSwapAPI::class.java)).start() }
}
// TODO: This should all be replaced by the identity service being updated // TODO: This should all be replaced by the identity service being updated
// as the network map changes. // as the network map changes.

View File

@ -24,7 +24,6 @@ import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
@ -63,10 +62,6 @@ enum class Role {
BUYER, BUYER,
SELLER SELLER
} }
private class Destination constructor(addr: Any, inMemory: Boolean): Serializable {
val inMemory = inMemory
val addr = addr
} }
// And this is the directory under the current working directory where each node will create its own server directory, // And this is the directory under the current working directory where each node will create its own server directory,
@ -77,7 +72,7 @@ fun main(args: Array<String>) {
exitProcess(runTraderDemo(args)) exitProcess(runTraderDemo(args))
} }
fun runTraderDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()): Int { fun runTraderDemo(args: Array<String>): Int {
val cashIssuerKey = generateKeyPair() val cashIssuerKey = generateKeyPair()
val cashIssuer = Party("Trusted cash issuer", cashIssuerKey.public) val cashIssuer = Party("Trusted cash issuer", cashIssuerKey.public)
val amount = 1000.DOLLARS `issued by` cashIssuer.ref(1) val amount = 1000.DOLLARS `issued by` cashIssuer.ref(1)
@ -110,19 +105,6 @@ fun runTraderDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()
} }
) )
val peerId: Int;
val id: Int
when (role) {
Role.BUYER -> {
peerId = 1
id = 0
}
Role.SELLER -> {
peerId = 0
id = 1
}
}
// Suppress the Artemis MQ noise, and activate the demo logging. // Suppress the Artemis MQ noise, and activate the demo logging.
// //
// The first two strings correspond to the first argument to StateMachineManager.add() but the way we handle logging // The first two strings correspond to the first argument to StateMachineManager.add() but the way we handle logging
@ -161,28 +143,9 @@ fun runTraderDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()
advertisedServices = emptySet() advertisedServices = emptySet()
cashIssuer = party cashIssuer = party
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
if(demoNodeConfig.inMemory) {
val handle = InMemoryMessagingNetwork.Handle(peerId, "Other Node")
NodeInfo(handle, party, setOf(NetworkMapService.Type))
} else {
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
}
} }
val messageService = messageNetwork.createNodeWithID(false, id).start().get()
// And now construct then start the node object. It takes a little while. // And now construct then start the node object. It takes a little while.
val node = logElapsedTime("Node startup") { val node = logElapsedTime("Node startup") { Node(directory, myNetAddr, config, networkMapId, advertisedServices).setup().start() }
if(demoNodeConfig.inMemory) {
DemoNode(messageService, directory, myNetAddr, config, networkMapId, advertisedServices).setup().start()
} else {
Node(directory, myNetAddr, config, networkMapId, advertisedServices).setup().start()
}
}
// TODO: Replace with a separate trusted cash issuer
if (cashIssuer == null) {
cashIssuer = node.services.storageService.myLegalIdentity
}
// What happens next depends on the role. The buyer sits around waiting for a trade to start. The seller role // What happens next depends on the role. The buyer sits around waiting for a trade to start. The seller role
// will contact the buyer and actually make something happen. // will contact the buyer and actually make something happen.
@ -190,24 +153,14 @@ fun runTraderDemo(args: Array<String>, demoNodeConfig: DemoConfig = DemoConfig()
if (role == Role.BUYER) { if (role == Role.BUYER) {
runBuyer(node, amount) runBuyer(node, amount)
} else { } else {
val dest: Destination val recipient = ArtemisMessagingService.makeRecipient(theirNetAddr)
val recipient: SingleMessageRecipient runSeller(myNetAddr, node, recipient, amount)
if(demoNodeConfig.inMemory) {
recipient = InMemoryMessagingNetwork.Handle(peerId, "Other Node")
dest = Destination(InMemoryMessagingNetwork.Handle(id, role.toString()), true)
} else {
recipient = ArtemisMessagingService.makeRecipient(theirNetAddr)
dest = Destination(myNetAddr, false)
}
runSeller(dest, node, recipient, amount))
} }
return 0 return 0
} }
private fun runSeller(myAddr: Destination, node: Node, recipient: SingleMessageRecipient) { private fun runSeller(myAddr: HostAndPort, node: Node, recipient: SingleMessageRecipient) {
// The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash. // The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash.
// //
// The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an // The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an
@ -283,12 +236,8 @@ private class TraderDemoProtocolBuyer(private val attachmentsPath: Path,
// As the seller initiates the two-party trade protocol, here, we will be the buyer. // As the seller initiates the two-party trade protocol, here, we will be the buyer.
try { try {
progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT
val origin: Destination = receive<Destination>(DEMO_TOPIC, 0).validate { it } val origin = receive<HostAndPort>(DEMO_TOPIC, 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) }
val recipient: SingleMessageRecipient = if(origin.inMemory) { val recipient = ArtemisMessagingService.makeRecipient(origin as HostAndPort)
origin.addr as InMemoryMessagingNetwork.Handle
} else {
ArtemisMessagingService.makeRecipient(origin.addr as HostAndPort)
}
// The session ID disambiguates the test trade. // The session ID disambiguates the test trade.
val sessionID = random63BitValue() val sessionID = random63BitValue()
@ -296,7 +245,7 @@ private class TraderDemoProtocolBuyer(private val attachmentsPath: Path,
send(DEMO_TOPIC, recipient, 0, sessionID) send(DEMO_TOPIC, recipient, 0, sessionID)
val notary = serviceHub.networkMapCache.notaryNodes[0] val notary = serviceHub.networkMapCache.notaryNodes[0]
val buyer = TwoPartyTradeProtocol.Buyer(recipient, notary.identity, amount, val buyer = TwoPartyTradeProtocol.Buyer(recipient, notary.identity, 1000.DOLLARS,
CommercialPaper.State::class.java, sessionID) CommercialPaper.State::class.java, sessionID)
// This invokes the trading protocol and out pops our finished transaction. // This invokes the trading protocol and out pops our finished transaction.
@ -339,7 +288,7 @@ ${Emoji.renderIfSupported(cpIssuance)}""")
} }
} }
private class TraderDemoProtocolSeller(val myAddr: Destination, private class TraderDemoProtocolSeller(val myAddr: HostAndPort,
val otherSide: SingleMessageRecipient, val otherSide: SingleMessageRecipient,
val amount: Amount<Issued<Currency>>, val amount: Amount<Issued<Currency>>,
override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() { override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() {
@ -350,21 +299,21 @@ private class TraderDemoProtocolSeller(val myAddr: Destination,
object SELF_ISSUING : ProgressTracker.Step("Got session ID back, issuing and timestamping some commercial paper") object SELF_ISSUING : ProgressTracker.Step("Got session ID back, issuing and timestamping some commercial paper")
object TRADING : ProgressTracker.Step("Starting the trade protocol") { object TRADING : ProgressTracker.Step("Starting the trade protocol")
override fun childProgressTracker(): ProgressTracker = TwoPartyTradeProtocol.Seller.tracker()
}
// We vend a progress tracker that already knows there's going to be a TwoPartyTradingProtocol involved at some // We vend a progress tracker that already knows there's going to be a TwoPartyTradingProtocol involved at some
// point: by setting up the tracker in advance, the user can see what's coming in more detail, instead of being // point: by setting up the tracker in advance, the user can see what's coming in more detail, instead of being
// surprised when it appears as a new set of tasks below the current one. // surprised when it appears as a new set of tasks below the current one.
fun tracker() = ProgressTracker(ANNOUNCING, SELF_ISSUING, TRADING) fun tracker() = ProgressTracker(ANNOUNCING, SELF_ISSUING, TRADING).apply {
childrenFor[TRADING] = TwoPartyTradeProtocol.Seller.tracker()
}
} }
@Suspendable @Suspendable
override fun call() { override fun call() {
progressTracker.currentStep = ANNOUNCING progressTracker.currentStep = ANNOUNCING
val sessionID = sendAndReceive<Long>(DEMO_TOPIC, otherSide, 0, 0, myAddress).validate { it } val sessionID = sendAndReceive<Long>(DEMO_TOPIC, otherSide, 0, 0, myAddr).validate { it }
progressTracker.currentStep = SELF_ISSUING progressTracker.currentStep = SELF_ISSUING