Address PR comments

This commit is contained in:
Andras Slemmer
2016-12-14 13:47:30 +00:00
committed by exfalso
parent 4f44962962
commit a3138ab0dc
11 changed files with 217 additions and 193 deletions

View File

@ -19,6 +19,7 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashCommand import net.corda.flows.CashCommand
import net.corda.flows.CashFlow import net.corda.flows.CashFlow
import net.corda.node.driver.callSuspendResume
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.services.User import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL import net.corda.node.services.config.configureTestSSL
@ -34,14 +35,11 @@ import org.junit.Before
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import rx.Observer import rx.Observer
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
class NodeMonitorModelTest { class NodeMonitorModelTest {
lateinit var aliceNode: NodeInfo lateinit var aliceNode: NodeInfo
lateinit var notaryNode: NodeInfo lateinit var notaryNode: NodeInfo
val stopDriver = CountDownLatch(1) lateinit var stopDriver: () -> Unit
var driverThread: Thread? = null
lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping> lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping>
lateinit var stateMachineUpdates: Observable<StateMachineUpdate> lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
@ -54,8 +52,7 @@ class NodeMonitorModelTest {
@Before @Before
fun start() { fun start() {
val driverStarted = CountDownLatch(1) stopDriver = callSuspendResume { suspend ->
driverThread = thread {
driver { driver {
val cashUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>())) val cashUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser)) val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser))
@ -75,17 +72,14 @@ class NodeMonitorModelTest {
clientToService = monitor.clientToService clientToService = monitor.clientToService
monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password) monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password)
driverStarted.countDown() suspend()
stopDriver.await()
} }
} }
driverStarted.await()
} }
@After @After
fun stop() { fun stop() {
stopDriver.countDown() stopDriver()
driverThread?.join()
} }
@Test @Test

View File

@ -80,7 +80,7 @@ interface MessagingService {
*/ */
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
/** Given information about either a specific node or a service returns it's corresponding address */ /** Given information about either a specific node or a service returns its corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
/** Returns an address that refers to this node. */ /** Returns an address that refers to this node. */

View File

@ -73,19 +73,12 @@ interface NetworkMapCache {
return candidates.singleOrNull() return candidates.singleOrNull()
} }
/** /** Look up all nodes advertising the service owned by [compositeKey] */
* Look up all nodes advertising the service owned by [compositeKey]
*/
fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List<NodeInfo> { fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List<NodeInfo> {
return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } } return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } }
} }
/** /** Returns information about the party, which may be a specific node or a service */
* Returns information about the party, which may be a specific node or a service
*
* @party The party we would like the address of.
* @return The address of the party, if found.
*/
fun getPartyInfo(party: Party): PartyInfo? fun getPartyInfo(party: Party): PartyInfo?
/** /**

View File

@ -7,7 +7,12 @@ import net.corda.core.node.ServiceEntry
/** /**
* Holds information about a [Party], which may refer to either a specific node or a service. * Holds information about a [Party], which may refer to either a specific node or a service.
*/ */
sealed class PartyInfo(val party: Party) { sealed class PartyInfo() {
class Node(val node: NodeInfo) : PartyInfo(node.legalIdentity) abstract val party: Party
class Service(val service: ServiceEntry) : PartyInfo(service.identity) class Node(val node: NodeInfo) : PartyInfo() {
override val party = node.legalIdentity
}
class Service(val service: ServiceEntry) : PartyInfo() {
override val party = service.identity
}
} }

View File

@ -12,6 +12,8 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand import net.corda.flows.CashCommand
import net.corda.flows.CashFlow import net.corda.flows.CashFlow
import net.corda.flows.CashFlowResult import net.corda.flows.CashFlowResult
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.callSuspendResume
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.services.config.configureTestSSL import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent import net.corda.node.services.messaging.ArtemisMessagingComponent
@ -20,140 +22,130 @@ import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.expect import net.corda.testing.expect
import net.corda.testing.expectEvents import net.corda.testing.expectEvents
import net.corda.testing.replicate import net.corda.testing.replicate
import org.junit.After
import org.junit.Before
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import java.net.Inet6Address
import java.net.InetAddress
import java.util.* import java.util.*
import kotlin.test.assertEquals import kotlin.test.assertEquals
class RaftValidatingNotaryServiceTests { class RaftValidatingNotaryServiceTests {
lateinit var stopDriver: () -> Unit
lateinit var alice: NodeInfo
lateinit var notaries: List<NodeHandle>
lateinit var aliceProxy: CordaRPCOps
lateinit var raftNotaryIdentity: Party
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
@Before
fun start() {
stopDriver = callSuspendResume { suspend ->
driver {
// Start Alice and 3 raft notaries
val clusterSize = 3
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser))
val notariesFuture = startNotaryCluster(
"Notary",
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
)
alice = aliceFuture.get().nodeInfo
val (notaryIdentity, notaryNodes) = notariesFuture.get()
raftNotaryIdentity = notaryIdentity
notaries = notaryNodes
assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
// Connect to Alice and the notaries
fun connectRpc(node: NodeInfo): CordaRPCOps {
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
client.start("test", "test")
return client.proxy()
}
aliceProxy = connectRpc(alice)
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
suspend()
}
}
}
@After
fun stop() {
stopDriver()
}
@Test @Test
fun `notarisation requests are distributed evenly in raft cluster`() { fun `notarisation requests are distributed evenly in raft cluster`() {
driver { // Issue 100 pounds, then pay ourselves 50x2 pounds
// Start Alice and 3 raft notaries val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
val clusterSize = 3 require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>())) for (i in 1..50) {
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser)) val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
val notariesFuture = startNotaryCluster( require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
"Notary", }
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
)
val alice = aliceFuture.get().nodeInfo // The state machines added in the notaries should map one-to-one to notarisation requests
val (raftNotaryIdentity, notaries) = notariesFuture.get() val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
assertEquals(notaries.size, clusterSize) replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
// Connect to Alice and the notaries update as StateMachineUpdate.Added
fun connectRpc(node: NodeInfo): CordaRPCOps { notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
client.start("test", "test")
return client.proxy()
}
val aliceProxy = connectRpc(alice)
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
val notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it }
// Issue 100 pounds, then pay ourselves 50x2 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1 .. 50) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// The state machines added in the notaries should map one-to-one to notarisation requests
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
}
} }
} }
// The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
// We allow some leeway for artemis as it doesn't always produce perfect distribution
require(notarisationsPerNotary.values.all { it > 10 })
} }
// The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
// We allow some leeway for artemis as it doesn't always produce perfect distribution
require(notarisationsPerNotary.values.all { it > 10 })
} }
@Test @Test
fun `cluster survives if a notary is killed`() { fun `cluster survives if a notary is killed`() {
driver { // Issue 100 pounds, then pay ourselves 10x5 pounds
// Start Alice and 3 raft notaries val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
val clusterSize = 3 require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>())) for (i in 1..10) {
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser)) val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
val notariesFuture = startNotaryCluster( require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
"Notary", }
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
)
val alice = aliceFuture.get().nodeInfo // Now kill a notary
val (raftNotaryIdentity, notaries) = notariesFuture.get() with(notaries[0].process) {
destroy()
waitFor()
}
assertEquals(notaries.size, clusterSize) // Pay ourselves another 10x5 pounds
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) for (i in 1..10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Connect to Alice and the notaries // Artemis still dispatches some requests to the dead notary but all others should go through.
fun connectRpc(node: NodeInfo): CordaRPCOps { val notarisationsPerNotary = HashMap<Party, Int>()
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL()) notaryStateMachines.expectEvents(isStrict = false) {
client.start("test", "test") replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
return client.proxy() expect(match = { it.second is StateMachineUpdate.Added }) {
} val (notary, update) = it
val aliceProxy = connectRpc(alice) update as StateMachineUpdate.Added
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) } notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
val notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
// Issue 100 pounds, then pay ourselves 10x5 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1 .. 10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Now kill a notary
notaries[0].process.apply {
destroy()
waitFor()
}
// Pay ourselves another 10x5 pounds
for (i in 1 .. 10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Artemis still dispatches some requests to the dead notary but all others should go through.
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
}
} }
} }
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
} }
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
} }
} }

View File

@ -5,12 +5,12 @@ package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.core.ThreadBox import net.corda.core.*
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.future
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
@ -33,10 +33,13 @@ import java.time.Instant
import java.time.ZoneOffset.UTC import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import java.util.* import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
/** /**
@ -160,6 +163,40 @@ fun <A> driver(
dsl = dsl dsl = dsl
) )
/**
* Executes the passed in closure in a new thread, providing a function that suspends the closure, passing control back
* to the caller's context. The returned function may be used to then resume the closure.
*
* This can be used in conjunction with the driver to create @Before/@After blocks that start/shutdown the driver:
*
* val stopDriver = callSuspendResume { suspend ->
* driver(someOption = someValue) {
* .. initialise some test variables ..
* suspend()
* }
* }
* .. do tests ..
* stopDriver()
*/
fun <C> callSuspendResume(closure: (suspend: () -> Unit) -> C): () -> C {
val suspendLatch = CountDownLatch(1)
val resumeLatch = CountDownLatch(1)
val returnFuture = CompletableFuture<C>()
thread {
returnFuture.complete(
closure {
suspendLatch.countDown()
resumeLatch.await()
}
)
}
suspendLatch.await()
return {
resumeLatch.countDown()
returnFuture.get()
}
}
/** /**
* This is a helper method to allow extending of the DSL, along the lines of * This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface * interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface
@ -322,7 +359,7 @@ open class DriverDSL(
} }
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>, override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, customOverrides: Map<String, Any?>): Future<NodeHandle> { rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
val messagingAddress = portAllocation.nextHostAndPort() val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -364,7 +401,7 @@ open class DriverDSL(
clusterSize: Int, clusterSize: Int,
type: ServiceType, type: ServiceType,
rpcUsers: List<User> rpcUsers: List<User>
): Future<Pair<Party, List<NodeHandle>>> { ): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { "Notary Node $it" } val nodeNames = (1..clusterSize).map { "Notary Node $it" }
val paths = nodeNames.map { driverDirectory / it } val paths = nodeNames.map { driverDirectory / it }
ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName) ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName)
@ -382,15 +419,11 @@ open class DriverDSL(
startNode(it, advertisedService, rpcUsers, configOverride) startNode(it, advertisedService, rpcUsers, configOverride)
} }
return future { return firstNotaryFuture.flatMap { firstNotary ->
val firstNotary = firstNotaryFuture.get()
val notaryParty = firstNotary.nodeInfo.notaryIdentity val notaryParty = firstNotary.nodeInfo.notaryIdentity
val restNotaries = restNotaryFutures.map { Futures.allAsList(restNotaryFutures).map { restNotaries ->
val notary = it.get() Pair(notaryParty, listOf(firstNotary) + restNotaries)
assertEquals(notaryParty, notary.nodeInfo.notaryIdentity)
notary
} }
Pair(notaryParty, listOf(firstNotary) + restNotaries)
} }
} }

View File

@ -74,18 +74,22 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* may change or evolve and code that relies upon it being a simple host/port may not function correctly. * may change or evolve and code that relies upon it being a simple host/port may not function correctly.
* For instance it may contain onion routing data. * For instance it may contain onion routing data.
* *
* @param queueName The name of the queue this address is associated with. This is either the direct peer queue or * [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's p2p queue or
* an advertised service queue. * an advertised service's queue.
*
* @param queueName The name of the queue this address is associated with.
* @param hostAndPort The address of the node. * @param hostAndPort The address of the node.
*/ */
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress { data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object { companion object {
fun asPeer(identity: CompositeKey, hostAndPort: HostAndPort) = fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
NodeAddress(SimpleString("$PEERS_PREFIX${identity.toBase58String()}"), hostAndPort) return NodeAddress(SimpleString("$PEERS_PREFIX${peerIdentity.toBase58String()}"), hostAndPort)
fun asService(identity: CompositeKey, hostAndPort: HostAndPort) = }
NodeAddress(SimpleString("$SERVICES_PREFIX${identity.toBase58String()}"), hostAndPort) fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress(SimpleString("$SERVICES_PREFIX${serviceIdentity.toBase58String()}"), hostAndPort)
}
} }
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)" override fun toString(): String = "${javaClass.simpleName}(queue = $queueName, $hostAndPort)"
} }
/** /**

View File

@ -122,13 +122,13 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/ */
private fun destroyOrCreateBridges(change: MapChange) { private fun destroyOrCreateBridges(change: MapChange) {
fun addAddresses(node: NodeInfo, target: HashSet<ArtemisPeerAddress>) { fun addAddresses(node: NodeInfo, targets: MutableSet<ArtemisPeerAddress>) {
// Add the node's address with the p2p queue. // Add the node's address with the p2p queue.
val nodeAddress = node.address as ArtemisPeerAddress val nodeAddress = node.address as ArtemisPeerAddress
target.add(nodeAddress) targets.add(nodeAddress)
// Add the node's address with service queues, one per service. // Add the node's address with service queues, one per service.
change.node.advertisedServices.forEach { node.advertisedServices.forEach {
target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort)) targets.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
} }
} }
@ -151,7 +151,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
maybeDestroyBridge(bridgeNameForAddress(it)) maybeDestroyBridge(bridgeNameForAddress(it))
} }
addressesToCreateBridgesTo.forEach { addressesToCreateBridgesTo.forEach {
maybeDeployBridgeForAddress(it) if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it)
} }
} }

View File

@ -169,10 +169,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable @Suspendable
private inline fun <reified M : SessionMessage> receiveInternal(session: FlowSession): M { private inline fun <reified M : SessionMessage> receiveInternal(session: FlowSession): M {
return suspendAndExpectReceive(ReceiveOnly(session, M::class.java)) return suspendAndExpectReceive(ReceiveOnly(session, M::class.java)).second
} }
private inline fun <reified M : SessionMessage> sendAndReceiveInternal(session: FlowSession, message: SessionMessage): M { private inline fun <reified M : SessionMessage> sendAndReceiveInternal(session: FlowSession, message: SessionMessage): M {
return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java)).second
}
private inline fun <reified M : SessionMessage> sendAndReceiveInternalWithParty(session: FlowSession, message: SessionMessage): Pair<Party, M> {
return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java)) return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java))
} }
@ -199,10 +203,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
openSessions[Pair(sessionFlow, otherParty)] = session openSessions[Pair(sessionFlow, otherParty)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name
val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload)
val sessionInitResponse = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit) val (peerParty, sessionInitResponse) = sendAndReceiveInternalWithParty<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) { if (sessionInitResponse is SessionConfirm) {
require(session.state is FlowSessionState.Initiating) require(session.state is FlowSessionState.Initiating)
session.state = FlowSessionState.Initiated(sessionInitResponse.peerParty, sessionInitResponse.initiatedSessionId) session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
return session return session
} else { } else {
sessionInitResponse as SessionReject sessionInitResponse as SessionReject
@ -211,8 +215,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
private fun <M : SessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): M { private fun <M : SessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): Pair<Party, M> {
fun getReceivedMessage(): ExistingSessionMessage? = receiveRequest.session.receivedMessages.poll() fun getReceivedMessage(): Pair<Party, ExistingSessionMessage>? = receiveRequest.session.receivedMessages.poll()
val polledMessage = getReceivedMessage() val polledMessage = getReceivedMessage()
val receivedMessage = if (polledMessage != null) { val receivedMessage = if (polledMessage != null) {
@ -228,11 +232,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $receiveRequest") ?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $receiveRequest")
} }
if (receivedMessage is SessionEnd) { if (receivedMessage.second is SessionEnd) {
openSessions.values.remove(receiveRequest.session) openSessions.values.remove(receiveRequest.session)
throw FlowSessionException("Counterparty on ${receiveRequest.session.state.sendToParty} has prematurely ended on $receiveRequest") throw FlowSessionException("Counterparty on ${receiveRequest.session.state.sendToParty} has prematurely ended on $receiveRequest")
} else if (receiveRequest.receiveType.isInstance(receivedMessage)) { } else if (receiveRequest.receiveType.isInstance(receivedMessage.second)) {
return receiveRequest.receiveType.cast(receivedMessage) return Pair(receivedMessage.first, receiveRequest.receiveType.cast(receivedMessage.second))
} else { } else {
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $receiveRequest") throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $receiveRequest")
} }

View File

@ -28,6 +28,8 @@ import net.corda.core.utilities.trace
import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiated
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiating
import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.bufferUntilDatabaseCommit import net.corda.node.utilities.bufferUntilDatabaseCommit
@ -214,17 +216,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg -> serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg ->
executor.checkOnThread() executor.checkOnThread()
val sessionMessage = message.data.deserialize<SessionMessage>() val sessionMessage = message.data.deserialize<SessionMessage>()
when (sessionMessage) { val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage) if (otherParty != null) {
is SessionInit -> { when (sessionMessage) {
// TODO Look up the party with the full X.500 name instead of just the legal name is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, otherParty)
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity is SessionInit -> onSessionInit(sessionMessage, otherParty)
if (otherParty != null) {
onSessionInit(sessionMessage, otherParty)
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
}
} }
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
} }
} }
} }
@ -238,14 +237,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
} }
} }
private fun onExistingSessionMessage(message: ExistingSessionMessage) { private fun onExistingSessionMessage(message: ExistingSessionMessage, otherParty: Party) {
val session = openSessions[message.recipientSessionId] val session = openSessions[message.recipientSessionId]
if (session != null) { if (session != null) {
session.psm.logger.trace { "Received $message on $session" } session.psm.logger.trace { "Received $message on $session" }
if (message is SessionEnd) { if (message is SessionEnd) {
openSessions.remove(message.recipientSessionId) openSessions.remove(message.recipientSessionId)
} }
session.receivedMessages += message session.receivedMessages += Pair(otherParty, message)
if (session.waitingForResponse) { if (session.waitingForResponse) {
// We only want to resume once, so immediately reset the flag. // We only want to resume once, so immediately reset the flag.
session.waitingForResponse = false session.waitingForResponse = false
@ -278,12 +277,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val psm = createFiber(flow) val psm = createFiber(flow)
val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(otherParty, otherPartySessionId)) val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(otherParty, otherPartySessionId))
if (sessionInit.firstPayload != null) { if (sessionInit.firstPayload != null) {
session.receivedMessages += SessionData(session.ourSessionId, sessionInit.firstPayload) session.receivedMessages += Pair(otherParty, SessionData(session.ourSessionId, sessionInit.firstPayload))
} }
openSessions[session.ourSessionId] = session openSessions[session.ourSessionId] = session
psm.openSessions[Pair(flow, otherParty)] = session psm.openSessions[Pair(flow, otherParty)] = session
updateCheckpoint(psm) updateCheckpoint(psm)
sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId, serviceHub.myInfo.legalIdentity), psm) sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId), psm)
psm.logger.debug { "Initiated from $sessionInit on $session" } psm.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(psm) startFiber(psm)
} else { } else {
@ -465,7 +464,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
interface SessionInitResponse : ExistingSessionMessage interface SessionInitResponse : ExistingSessionMessage
data class SessionConfirm(val initiatorSessionId: Long, val initiatedSessionId: Long, val peerParty: Party) : SessionInitResponse { data class SessionConfirm(val initiatorSessionId: Long, val initiatedSessionId: Long) : SessionInitResponse {
override val recipientSessionId: Long get() = initiatorSessionId override val recipientSessionId: Long get() = initiatorSessionId
} }
@ -510,7 +509,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
var state: FlowSessionState, var state: FlowSessionState,
@Volatile var waitingForResponse: Boolean = false @Volatile var waitingForResponse: Boolean = false
) { ) {
val receivedMessages = ConcurrentLinkedQueue<ExistingSessionMessage>() val receivedMessages = ConcurrentLinkedQueue<Pair<Party, ExistingSessionMessage>>()
val psm: FlowStateMachineImpl<*> get() = flow.fsm as FlowStateMachineImpl<*> val psm: FlowStateMachineImpl<*> get() = flow.fsm as FlowStateMachineImpl<*>
} }

View File

@ -53,7 +53,7 @@ class StateMachineManagerTests {
node1 = nodes.first node1 = nodes.first
node2 = nodes.second node2 = nodes.second
val notaryKeyPair = generateKeyPair() val notaryKeyPair = generateKeyPair()
// Note that these notaries don't operate correctly as they don's share their state. They are only used for testing // Note that these notaries don't operate correctly as they don't share their state. They are only used for testing
// service addressing. // service addressing.
notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000") notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000") notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
@ -216,14 +216,14 @@ class StateMachineManagerTests {
assertSessionTransfers(node2, assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, payload) to node2, node1 sent sessionInit(SendFlow::class, payload) to node2,
node2 sent sessionConfirm(node2) to node1, node2 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node2 node1 sent sessionEnd() to node2
//There's no session end from the other flows as they're manually suspended //There's no session end from the other flows as they're manually suspended
) )
assertSessionTransfers(node3, assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, payload) to node3, node1 sent sessionInit(SendFlow::class, payload) to node3,
node3 sent sessionConfirm(node3) to node1, node3 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node3 node1 sent sessionEnd() to node3
//There's no session end from the other flows as they're manually suspended //There's no session end from the other flows as they're manually suspended
) )
@ -249,14 +249,14 @@ class StateMachineManagerTests {
assertSessionTransfers(node2, assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm(node2) to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionData(node2Payload) to node1, node2 sent sessionData(node2Payload) to node1,
node2 sent sessionEnd() to node1 node2 sent sessionEnd() to node1
) )
assertSessionTransfers(node3, assertSessionTransfers(node3,
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node3, node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node3,
node3 sent sessionConfirm(node3) to node1, node3 sent sessionConfirm() to node1,
node3 sent sessionData(node3Payload) to node1, node3 sent sessionData(node3Payload) to node1,
node3 sent sessionEnd() to node1 node3 sent sessionEnd() to node1
) )
@ -270,7 +270,7 @@ class StateMachineManagerTests {
assertSessionTransfers( assertSessionTransfers(
node1 sent sessionInit(PingPongFlow::class, 10L) to node2, node1 sent sessionInit(PingPongFlow::class, 10L) to node2,
node2 sent sessionConfirm(node2) to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionData(20L) to node1, node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2, node1 sent sessionData(11L) to node2,
node2 sent sessionData(21L) to node1, node2 sent sessionData(21L) to node1,
@ -337,7 +337,7 @@ class StateMachineManagerTests {
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java) assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java)
assertSessionTransfers( assertSessionTransfers(
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm(node2) to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionEnd() to node1 node2 sent sessionEnd() to node1
) )
} }
@ -359,7 +359,7 @@ class StateMachineManagerTests {
private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload) private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload)
private fun sessionConfirm(mockNode: MockNode) = SessionConfirm(0, 0, mockNode.info.legalIdentity) private fun sessionConfirm() = SessionConfirm(0, 0)
private fun sessionData(payload: Any) = SessionData(0, payload) private fun sessionData(payload: Any) = SessionData(0, payload)