Trader demo now works as a test using in memory messaging.

This commit is contained in:
Clinton Alexander 2016-06-08 11:58:46 +01:00 committed by Andras Slemmer
parent 89b8b164f7
commit 929b752b42
2 changed files with 45 additions and 24 deletions

View File

@ -33,6 +33,7 @@ import com.r3corda.protocols.TwoPartyTradeProtocol
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import joptsimple.OptionParser import joptsimple.OptionParser
import joptsimple.OptionSet import joptsimple.OptionSet
import java.io.Serializable
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
@ -64,6 +65,11 @@ enum class Role {
SELLER SELLER
} }
private class Destination constructor(addr: Any, inMemory: Boolean): Serializable {
val inMemory = inMemory
val addr = addr
}
// And this is the directory under the current working directory where each node will create its own server directory, // And this is the directory under the current working directory where each node will create its own server directory,
// which holds things like checkpoints, keys, databases, message logs etc. // which holds things like checkpoints, keys, databases, message logs etc.
val DIRNAME = "trader-demo" val DIRNAME = "trader-demo"
@ -105,6 +111,19 @@ fun runTraderDemo(args: Array<String>, useInMemoryMessaging: Boolean = false): I
} }
) )
val peerId: Int;
val id: Int
when (role) {
Role.BUYER -> {
peerId = 1
id = 0
}
Role.SELLER -> {
peerId = 0
id = 1
}
}
// Suppress the Artemis MQ noise, and activate the demo logging. // Suppress the Artemis MQ noise, and activate the demo logging.
// //
// The first two strings correspond to the first argument to StateMachineManager.add() but the way we handle logging // The first two strings correspond to the first argument to StateMachineManager.add() but the way we handle logging
@ -123,19 +142,6 @@ fun runTraderDemo(args: Array<String>, useInMemoryMessaging: Boolean = false): I
NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load())) NodeConfigurationFromConfig(override.withFallback(ConfigFactory.load()))
} }
val peerId: Int;
val id: Int
when (role) {
Role.BUYER -> {
peerId = 1
id = 0
}
Role.SELLER -> {
peerId = 0
id = 1
}
}
// Which services will this instance of the node provide to the network? // Which services will this instance of the node provide to the network?
val advertisedServices: Set<ServiceType> val advertisedServices: Set<ServiceType>
@ -181,13 +187,24 @@ fun runTraderDemo(args: Array<String>, useInMemoryMessaging: Boolean = false): I
if (role == Role.BUYER) { if (role == Role.BUYER) {
runBuyer(node, amount) runBuyer(node, amount)
} else { } else {
runSeller(myNetAddr, node, theirNetAddr, amount) val dest: Destination
val recipient: SingleMessageRecipient
if(useInMemoryMessaging) {
recipient = InMemoryMessagingNetwork.Handle(peerId, "Other Node")
dest = Destination(InMemoryMessagingNetwork.Handle(id, role.toString()), true)
} else {
recipient = ArtemisMessagingService.makeRecipient(theirNetAddr)
dest = Destination(myNetAddr, false)
}
runSeller(dest, node, recipient, amount))
} }
return 0 return 0
} }
private fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) { private fun runSeller(myAddr: Destination, node: Node, recipient: SingleMessageRecipient) {
// The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash. // The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash.
// //
// The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an // The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an
@ -206,8 +223,7 @@ private fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndP
it.second.get() it.second.get()
} }
} else { } else {
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr) val seller = TraderDemoProtocolSeller(myAddr, recipient, amount)
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide, amount)
node.smm.add("demo.seller", seller).get() node.smm.add("demo.seller", seller).get()
} }
@ -264,16 +280,20 @@ private class TraderDemoProtocolBuyer(private val attachmentsPath: Path,
// As the seller initiates the two-party trade protocol, here, we will be the buyer. // As the seller initiates the two-party trade protocol, here, we will be the buyer.
try { try {
progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT
val hostname = receive<HostAndPort>(DEMO_TOPIC, 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) } val origin: Destination = receive<Destination>(DEMO_TOPIC, 0).validate { it }
val newPartnerAddr = ArtemisMessagingService.makeRecipient(hostname) val recipient: SingleMessageRecipient = if(origin.inMemory) {
origin.addr as InMemoryMessagingNetwork.Handle
} else {
ArtemisMessagingService.makeRecipient(origin.addr as HostAndPort)
}
// The session ID disambiguates the test trade. // The session ID disambiguates the test trade.
val sessionID = random63BitValue() val sessionID = random63BitValue()
progressTracker.currentStep = STARTING_BUY progressTracker.currentStep = STARTING_BUY
send(DEMO_TOPIC, newPartnerAddr, 0, sessionID) send(DEMO_TOPIC, recipient, 0, sessionID)
val notary = serviceHub.networkMapCache.notaryNodes[0] val notary = serviceHub.networkMapCache.notaryNodes[0]
val buyer = TwoPartyTradeProtocol.Buyer(newPartnerAddr, notary.identity, amount, val buyer = TwoPartyTradeProtocol.Buyer(recipient, notary.identity, amount,
CommercialPaper.State::class.java, sessionID) CommercialPaper.State::class.java, sessionID)
// This invokes the trading protocol and out pops our finished transaction. // This invokes the trading protocol and out pops our finished transaction.
@ -316,7 +336,7 @@ ${Emoji.renderIfSupported(cpIssuance)}""")
} }
} }
private class TraderDemoProtocolSeller(val myAddress: HostAndPort, private class TraderDemoProtocolSeller(val myAddr: Destination,
val otherSide: SingleMessageRecipient, val otherSide: SingleMessageRecipient,
val amount: Amount<Issued<Currency>>, val amount: Amount<Issued<Currency>>,
override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() { override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() {
@ -392,7 +412,7 @@ private class TraderDemoProtocolSeller(val myAddress: HostAndPort,
// Now make a dummy transaction that moves it to a new key, just to show that resolving dependencies works. // Now make a dummy transaction that moves it to a new key, just to show that resolving dependencies works.
val move: SignedTransaction = run { val move: SignedTransaction = run {
val builder = TransactionType.General.Builder() val builder = TransactionBuilder()
CommercialPaper().generateMove(builder, issuance.tx.outRef(0), ownedBy) CommercialPaper().generateMove(builder, issuance.tx.outRef(0), ownedBy)
builder.signWith(keyPair) builder.signWith(keyPair)
val notarySignature = subProtocol(NotaryProtocol.Client(builder.toSignedTransaction(false))) val notarySignature = subProtocol(NotaryProtocol.Client(builder.toSignedTransaction(false)))

View File

@ -20,10 +20,11 @@ class TraderDemoTest {
private fun runBuyer() { private fun runBuyer() {
thread(true, false, null, "Buyer", -1, { runTraderDemo(arrayOf("--role", "BUYER"), true) }) thread(true, false, null, "Buyer", -1, { runTraderDemo(arrayOf("--role", "BUYER"), true) })
Thread.sleep(5000) Thread.sleep(15000)
} }
private fun runSeller() { private fun runSeller() {
println("Running Seller")
assertEquals(runTraderDemo(arrayOf("--role", "SELLER"), true), 0) assertEquals(runTraderDemo(arrayOf("--role", "SELLER"), true), 0)
} }