From a3138ab0dcb4e65c5660cc172f42d365330ca6d9 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 14 Dec 2016 13:47:30 +0000 Subject: [PATCH] Address PR comments --- .../net/corda/client/NodeMonitorModelTest.kt | 16 +- .../net/corda/core/messaging/Messaging.kt | 2 +- .../core/node/services/NetworkMapCache.kt | 11 +- .../net/corda/core/node/services/PartyInfo.kt | 11 +- .../RaftValidatingNotaryServiceTests.kt | 218 +++++++++--------- .../kotlin/net/corda/node/driver/Driver.kt | 57 ++++- .../messaging/ArtemisMessagingComponent.kt | 18 +- .../messaging/ArtemisMessagingServer.kt | 10 +- .../statemachine/FlowStateMachineImpl.kt | 20 +- .../statemachine/StateMachineManager.kt | 31 ++- .../statemachine/StateMachineManagerTests.kt | 16 +- 11 files changed, 217 insertions(+), 193 deletions(-) diff --git a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt index 2f85a897f1..f2b99f2266 100644 --- a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt @@ -19,6 +19,7 @@ import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.flows.CashCommand import net.corda.flows.CashFlow +import net.corda.node.driver.callSuspendResume import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.configureTestSSL @@ -34,14 +35,11 @@ import org.junit.Before import org.junit.Test import rx.Observable import rx.Observer -import java.util.concurrent.CountDownLatch -import kotlin.concurrent.thread class NodeMonitorModelTest { lateinit var aliceNode: NodeInfo lateinit var notaryNode: NodeInfo - val stopDriver = CountDownLatch(1) - var driverThread: Thread? = null + lateinit var stopDriver: () -> Unit lateinit var stateMachineTransactionMapping: Observable lateinit var stateMachineUpdates: Observable @@ -54,8 +52,7 @@ class NodeMonitorModelTest { @Before fun start() { - val driverStarted = CountDownLatch(1) - driverThread = thread { + stopDriver = callSuspendResume { suspend -> driver { val cashUser = User("user1", "test", permissions = setOf(startFlowPermission())) val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser)) @@ -75,17 +72,14 @@ class NodeMonitorModelTest { clientToService = monitor.clientToService monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password) - driverStarted.countDown() - stopDriver.await() + suspend() } } - driverStarted.await() } @After fun stop() { - stopDriver.countDown() - driverThread?.join() + stopDriver() } @Test diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt index 4627c3306f..16f437f00a 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt @@ -80,7 +80,7 @@ interface MessagingService { */ fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message - /** Given information about either a specific node or a service returns it's corresponding address */ + /** Given information about either a specific node or a service returns its corresponding address */ fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients /** Returns an address that refers to this node. */ diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 64868070c3..6ead561818 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -73,19 +73,12 @@ interface NetworkMapCache { return candidates.singleOrNull() } - /** - * Look up all nodes advertising the service owned by [compositeKey] - */ + /** Look up all nodes advertising the service owned by [compositeKey] */ fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List { return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } } } - /** - * Returns information about the party, which may be a specific node or a service - * - * @party The party we would like the address of. - * @return The address of the party, if found. - */ + /** Returns information about the party, which may be a specific node or a service */ fun getPartyInfo(party: Party): PartyInfo? /** diff --git a/core/src/main/kotlin/net/corda/core/node/services/PartyInfo.kt b/core/src/main/kotlin/net/corda/core/node/services/PartyInfo.kt index d0408cf327..b34086992a 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/PartyInfo.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/PartyInfo.kt @@ -7,7 +7,12 @@ import net.corda.core.node.ServiceEntry /** * Holds information about a [Party], which may refer to either a specific node or a service. */ -sealed class PartyInfo(val party: Party) { - class Node(val node: NodeInfo) : PartyInfo(node.legalIdentity) - class Service(val service: ServiceEntry) : PartyInfo(service.identity) +sealed class PartyInfo() { + abstract val party: Party + class Node(val node: NodeInfo) : PartyInfo() { + override val party = node.legalIdentity + } + class Service(val service: ServiceEntry) : PartyInfo() { + override val party = service.identity + } } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt index d0900cb754..c2b3c8080f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt @@ -12,6 +12,8 @@ import net.corda.core.serialization.OpaqueBytes import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.flows.CashFlowResult +import net.corda.node.driver.NodeHandle +import net.corda.node.driver.callSuspendResume import net.corda.node.driver.driver import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent @@ -20,140 +22,130 @@ import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.testing.expect import net.corda.testing.expectEvents import net.corda.testing.replicate +import org.junit.After +import org.junit.Before import org.junit.Test import rx.Observable -import java.net.Inet6Address -import java.net.InetAddress import java.util.* import kotlin.test.assertEquals class RaftValidatingNotaryServiceTests { + lateinit var stopDriver: () -> Unit + lateinit var alice: NodeInfo + lateinit var notaries: List + lateinit var aliceProxy: CordaRPCOps + lateinit var raftNotaryIdentity: Party + lateinit var notaryStateMachines: Observable> + + @Before + fun start() { + stopDriver = callSuspendResume { suspend -> + driver { + // Start Alice and 3 raft notaries + val clusterSize = 3 + val testUser = User("test", "test", permissions = setOf(startFlowPermission())) + val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser)) + val notariesFuture = startNotaryCluster( + "Notary", + rpcUsers = listOf(testUser), + clusterSize = clusterSize, + type = RaftValidatingNotaryService.type + ) + + alice = aliceFuture.get().nodeInfo + val (notaryIdentity, notaryNodes) = notariesFuture.get() + raftNotaryIdentity = notaryIdentity + notaries = notaryNodes + + assertEquals(notaries.size, clusterSize) + assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) + + // Connect to Alice and the notaries + fun connectRpc(node: NodeInfo): CordaRPCOps { + val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL()) + client.start("test", "test") + return client.proxy() + } + aliceProxy = connectRpc(alice) + val notaryProxies = notaries.map { connectRpc(it.nodeInfo) } + notaryStateMachines = Observable.from(notaryProxies.map { proxy -> + proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) } + }).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed() + + suspend() + } + } + } + + @After + fun stop() { + stopDriver() + } + @Test fun `notarisation requests are distributed evenly in raft cluster`() { - driver { - // Start Alice and 3 raft notaries - val clusterSize = 3 - val testUser = User("test", "test", permissions = setOf(startFlowPermission())) - val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser)) - val notariesFuture = startNotaryCluster( - "Notary", - rpcUsers = listOf(testUser), - clusterSize = clusterSize, - type = RaftValidatingNotaryService.type - ) + // Issue 100 pounds, then pay ourselves 50x2 pounds + val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity)) + require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + for (i in 1..50) { + val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) + require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + } - val alice = aliceFuture.get().nodeInfo - val (raftNotaryIdentity, notaries) = notariesFuture.get() - - assertEquals(notaries.size, clusterSize) - assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) - - // Connect to Alice and the notaries - fun connectRpc(node: NodeInfo): CordaRPCOps { - val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL()) - client.start("test", "test") - return client.proxy() - } - val aliceProxy = connectRpc(alice) - val notaryProxies = notaries.map { connectRpc(it.nodeInfo) } - val notaryStateMachines = Observable.from(notaryProxies.map { proxy -> - proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) } - }).flatMap { it } - - // Issue 100 pounds, then pay ourselves 50x2 pounds - val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity)) - require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success) - for (i in 1 .. 50) { - val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) - require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) - } - - // The state machines added in the notaries should map one-to-one to notarisation requests - val notarisationsPerNotary = HashMap() - notaryStateMachines.expectEvents(isStrict = false) { - replicate>(50) { - expect(match = { it.second is StateMachineUpdate.Added }) { - val (notary, update) = it - update as StateMachineUpdate.Added - notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 } - } + // The state machines added in the notaries should map one-to-one to notarisation requests + val notarisationsPerNotary = HashMap() + notaryStateMachines.expectEvents(isStrict = false) { + replicate>(50) { + expect(match = { it.second is StateMachineUpdate.Added }) { + val (notary, update) = it + update as StateMachineUpdate.Added + notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 } } } - - // The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin - println("Notarisation distribution: $notarisationsPerNotary") - require(notarisationsPerNotary.size == 3) - // We allow some leeway for artemis as it doesn't always produce perfect distribution - require(notarisationsPerNotary.values.all { it > 10 }) } + + // The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin + println("Notarisation distribution: $notarisationsPerNotary") + require(notarisationsPerNotary.size == 3) + // We allow some leeway for artemis as it doesn't always produce perfect distribution + require(notarisationsPerNotary.values.all { it > 10 }) } @Test fun `cluster survives if a notary is killed`() { - driver { - // Start Alice and 3 raft notaries - val clusterSize = 3 - val testUser = User("test", "test", permissions = setOf(startFlowPermission())) - val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser)) - val notariesFuture = startNotaryCluster( - "Notary", - rpcUsers = listOf(testUser), - clusterSize = clusterSize, - type = RaftValidatingNotaryService.type - ) + // Issue 100 pounds, then pay ourselves 10x5 pounds + val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity)) + require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + for (i in 1..10) { + val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) + require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + } - val alice = aliceFuture.get().nodeInfo - val (raftNotaryIdentity, notaries) = notariesFuture.get() + // Now kill a notary + with(notaries[0].process) { + destroy() + waitFor() + } - assertEquals(notaries.size, clusterSize) - assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) + // Pay ourselves another 10x5 pounds + for (i in 1..10) { + val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) + require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + } - // Connect to Alice and the notaries - fun connectRpc(node: NodeInfo): CordaRPCOps { - val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL()) - client.start("test", "test") - return client.proxy() - } - val aliceProxy = connectRpc(alice) - val notaryProxies = notaries.map { connectRpc(it.nodeInfo) } - val notaryStateMachines = Observable.from(notaryProxies.map { proxy -> - proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) } - }).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed() - - // Issue 100 pounds, then pay ourselves 10x5 pounds - val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity)) - require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success) - for (i in 1 .. 10) { - val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) - require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) - } - - // Now kill a notary - notaries[0].process.apply { - destroy() - waitFor() - } - - // Pay ourselves another 10x5 pounds - for (i in 1 .. 10) { - val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) - require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) - } - - // Artemis still dispatches some requests to the dead notary but all others should go through. - val notarisationsPerNotary = HashMap() - notaryStateMachines.expectEvents(isStrict = false) { - replicate>(15) { - expect(match = { it.second is StateMachineUpdate.Added }) { - val (notary, update) = it - update as StateMachineUpdate.Added - notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 } - } + // Artemis still dispatches some requests to the dead notary but all others should go through. + val notarisationsPerNotary = HashMap() + notaryStateMachines.expectEvents(isStrict = false) { + replicate>(15) { + expect(match = { it.second is StateMachineUpdate.Added }) { + val (notary, update) = it + update as StateMachineUpdate.Added + notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 } } } - - println("Notarisation distribution: $notarisationsPerNotary") - require(notarisationsPerNotary.size == 3) } + + println("Notarisation distribution: $notarisationsPerNotary") + require(notarisationsPerNotary.size == 3) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index 982505a4f6..8bfafc5dbf 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -5,12 +5,12 @@ package net.corda.node.driver import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.module.SimpleModule import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions -import net.corda.core.ThreadBox +import net.corda.core.* import net.corda.core.crypto.Party -import net.corda.core.div -import net.corda.core.future import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType @@ -33,10 +33,13 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch import java.util.concurrent.Future import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread import kotlin.test.assertEquals /** @@ -160,6 +163,40 @@ fun driver( dsl = dsl ) +/** + * Executes the passed in closure in a new thread, providing a function that suspends the closure, passing control back + * to the caller's context. The returned function may be used to then resume the closure. + * + * This can be used in conjunction with the driver to create @Before/@After blocks that start/shutdown the driver: + * + * val stopDriver = callSuspendResume { suspend -> + * driver(someOption = someValue) { + * .. initialise some test variables .. + * suspend() + * } + * } + * .. do tests .. + * stopDriver() + */ +fun callSuspendResume(closure: (suspend: () -> Unit) -> C): () -> C { + val suspendLatch = CountDownLatch(1) + val resumeLatch = CountDownLatch(1) + val returnFuture = CompletableFuture() + thread { + returnFuture.complete( + closure { + suspendLatch.countDown() + resumeLatch.await() + } + ) + } + suspendLatch.await() + return { + resumeLatch.countDown() + returnFuture.get() + } +} + /** * This is a helper method to allow extending of the DSL, along the lines of * interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface @@ -322,7 +359,7 @@ open class DriverDSL( } override fun startNode(providedName: String?, advertisedServices: Set, - rpcUsers: List, customOverrides: Map): Future { + rpcUsers: List, customOverrides: Map): ListenableFuture { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() val debugPort = if (isDebug) debugPortAllocation.nextPort() else null @@ -364,7 +401,7 @@ open class DriverDSL( clusterSize: Int, type: ServiceType, rpcUsers: List - ): Future>> { + ): ListenableFuture>> { val nodeNames = (1..clusterSize).map { "Notary Node $it" } val paths = nodeNames.map { driverDirectory / it } ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName) @@ -382,15 +419,11 @@ open class DriverDSL( startNode(it, advertisedService, rpcUsers, configOverride) } - return future { - val firstNotary = firstNotaryFuture.get() + return firstNotaryFuture.flatMap { firstNotary -> val notaryParty = firstNotary.nodeInfo.notaryIdentity - val restNotaries = restNotaryFutures.map { - val notary = it.get() - assertEquals(notaryParty, notary.nodeInfo.notaryIdentity) - notary + Futures.allAsList(restNotaryFutures).map { restNotaries -> + Pair(notaryParty, listOf(firstNotary) + restNotaries) } - Pair(notaryParty, listOf(firstNotary) + restNotaries) } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt index 171c769d9a..4ce27f38de 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -74,18 +74,22 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { * may change or evolve and code that relies upon it being a simple host/port may not function correctly. * For instance it may contain onion routing data. * - * @param queueName The name of the queue this address is associated with. This is either the direct peer queue or - * an advertised service queue. + * [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's p2p queue or + * an advertised service's queue. + * + * @param queueName The name of the queue this address is associated with. * @param hostAndPort The address of the node. */ data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress { companion object { - fun asPeer(identity: CompositeKey, hostAndPort: HostAndPort) = - NodeAddress(SimpleString("$PEERS_PREFIX${identity.toBase58String()}"), hostAndPort) - fun asService(identity: CompositeKey, hostAndPort: HostAndPort) = - NodeAddress(SimpleString("$SERVICES_PREFIX${identity.toBase58String()}"), hostAndPort) + fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress { + return NodeAddress(SimpleString("$PEERS_PREFIX${peerIdentity.toBase58String()}"), hostAndPort) + } + fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress { + return NodeAddress(SimpleString("$SERVICES_PREFIX${serviceIdentity.toBase58String()}"), hostAndPort) + } } - override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)" + override fun toString(): String = "${javaClass.simpleName}(queue = $queueName, $hostAndPort)" } /** diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 805f731dd2..96d2035e5c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -122,13 +122,13 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. */ private fun destroyOrCreateBridges(change: MapChange) { - fun addAddresses(node: NodeInfo, target: HashSet) { + fun addAddresses(node: NodeInfo, targets: MutableSet) { // Add the node's address with the p2p queue. val nodeAddress = node.address as ArtemisPeerAddress - target.add(nodeAddress) + targets.add(nodeAddress) // Add the node's address with service queues, one per service. - change.node.advertisedServices.forEach { - target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort)) + node.advertisedServices.forEach { + targets.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort)) } } @@ -151,7 +151,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, maybeDestroyBridge(bridgeNameForAddress(it)) } addressesToCreateBridgesTo.forEach { - maybeDeployBridgeForAddress(it) + if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 140abbcc2c..4ec2710fd6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -169,10 +169,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable private inline fun receiveInternal(session: FlowSession): M { - return suspendAndExpectReceive(ReceiveOnly(session, M::class.java)) + return suspendAndExpectReceive(ReceiveOnly(session, M::class.java)).second } private inline fun sendAndReceiveInternal(session: FlowSession, message: SessionMessage): M { + return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java)).second + } + + private inline fun sendAndReceiveInternalWithParty(session: FlowSession, message: SessionMessage): Pair { return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java)) } @@ -199,10 +203,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, openSessions[Pair(sessionFlow, otherParty)] = session val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) - val sessionInitResponse = sendAndReceiveInternal(session, sessionInit) + val (peerParty, sessionInitResponse) = sendAndReceiveInternalWithParty(session, sessionInit) if (sessionInitResponse is SessionConfirm) { require(session.state is FlowSessionState.Initiating) - session.state = FlowSessionState.Initiated(sessionInitResponse.peerParty, sessionInitResponse.initiatedSessionId) + session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId) return session } else { sessionInitResponse as SessionReject @@ -211,8 +215,8 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - private fun suspendAndExpectReceive(receiveRequest: ReceiveRequest): M { - fun getReceivedMessage(): ExistingSessionMessage? = receiveRequest.session.receivedMessages.poll() + private fun suspendAndExpectReceive(receiveRequest: ReceiveRequest): Pair { + fun getReceivedMessage(): Pair? = receiveRequest.session.receivedMessages.poll() val polledMessage = getReceivedMessage() val receivedMessage = if (polledMessage != null) { @@ -228,11 +232,11 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, ?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $receiveRequest") } - if (receivedMessage is SessionEnd) { + if (receivedMessage.second is SessionEnd) { openSessions.values.remove(receiveRequest.session) throw FlowSessionException("Counterparty on ${receiveRequest.session.state.sendToParty} has prematurely ended on $receiveRequest") - } else if (receiveRequest.receiveType.isInstance(receivedMessage)) { - return receiveRequest.receiveType.cast(receivedMessage) + } else if (receiveRequest.receiveType.isInstance(receivedMessage.second)) { + return Pair(receivedMessage.first, receiveRequest.receiveType.cast(receivedMessage.second)) } else { throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $receiveRequest") } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index e9188700f1..5ec0e726a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -28,6 +28,8 @@ import net.corda.core.utilities.trace import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiated +import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiating import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.bufferUntilDatabaseCommit @@ -214,17 +216,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg -> executor.checkOnThread() val sessionMessage = message.data.deserialize() - when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage) - is SessionInit -> { - // TODO Look up the party with the full X.500 name instead of just the legal name - val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity - if (otherParty != null) { - onSessionInit(sessionMessage, otherParty) - } else { - logger.error("Unknown peer ${message.peer} in $sessionMessage") - } + val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity + if (otherParty != null) { + when (sessionMessage) { + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, otherParty) + is SessionInit -> onSessionInit(sessionMessage, otherParty) } + } else { + logger.error("Unknown peer ${message.peer} in $sessionMessage") } } } @@ -238,14 +237,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } - private fun onExistingSessionMessage(message: ExistingSessionMessage) { + private fun onExistingSessionMessage(message: ExistingSessionMessage, otherParty: Party) { val session = openSessions[message.recipientSessionId] if (session != null) { session.psm.logger.trace { "Received $message on $session" } if (message is SessionEnd) { openSessions.remove(message.recipientSessionId) } - session.receivedMessages += message + session.receivedMessages += Pair(otherParty, message) if (session.waitingForResponse) { // We only want to resume once, so immediately reset the flag. session.waitingForResponse = false @@ -278,12 +277,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val psm = createFiber(flow) val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(otherParty, otherPartySessionId)) if (sessionInit.firstPayload != null) { - session.receivedMessages += SessionData(session.ourSessionId, sessionInit.firstPayload) + session.receivedMessages += Pair(otherParty, SessionData(session.ourSessionId, sessionInit.firstPayload)) } openSessions[session.ourSessionId] = session psm.openSessions[Pair(flow, otherParty)] = session updateCheckpoint(psm) - sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId, serviceHub.myInfo.legalIdentity), psm) + sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId), psm) psm.logger.debug { "Initiated from $sessionInit on $session" } startFiber(psm) } else { @@ -465,7 +464,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, interface SessionInitResponse : ExistingSessionMessage - data class SessionConfirm(val initiatorSessionId: Long, val initiatedSessionId: Long, val peerParty: Party) : SessionInitResponse { + data class SessionConfirm(val initiatorSessionId: Long, val initiatedSessionId: Long) : SessionInitResponse { override val recipientSessionId: Long get() = initiatorSessionId } @@ -510,7 +509,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, var state: FlowSessionState, @Volatile var waitingForResponse: Boolean = false ) { - val receivedMessages = ConcurrentLinkedQueue() + val receivedMessages = ConcurrentLinkedQueue>() val psm: FlowStateMachineImpl<*> get() = flow.fsm as FlowStateMachineImpl<*> } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index e037803ba3..d7090810e9 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -53,7 +53,7 @@ class StateMachineManagerTests { node1 = nodes.first node2 = nodes.second val notaryKeyPair = generateKeyPair() - // Note that these notaries don't operate correctly as they don's share their state. They are only used for testing + // Note that these notaries don't operate correctly as they don't share their state. They are only used for testing // service addressing. notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000") notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000") @@ -216,14 +216,14 @@ class StateMachineManagerTests { assertSessionTransfers(node2, node1 sent sessionInit(SendFlow::class, payload) to node2, - node2 sent sessionConfirm(node2) to node1, + node2 sent sessionConfirm() to node1, node1 sent sessionEnd() to node2 //There's no session end from the other flows as they're manually suspended ) assertSessionTransfers(node3, node1 sent sessionInit(SendFlow::class, payload) to node3, - node3 sent sessionConfirm(node3) to node1, + node3 sent sessionConfirm() to node1, node1 sent sessionEnd() to node3 //There's no session end from the other flows as they're manually suspended ) @@ -249,14 +249,14 @@ class StateMachineManagerTests { assertSessionTransfers(node2, node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, - node2 sent sessionConfirm(node2) to node1, + node2 sent sessionConfirm() to node1, node2 sent sessionData(node2Payload) to node1, node2 sent sessionEnd() to node1 ) assertSessionTransfers(node3, node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node3, - node3 sent sessionConfirm(node3) to node1, + node3 sent sessionConfirm() to node1, node3 sent sessionData(node3Payload) to node1, node3 sent sessionEnd() to node1 ) @@ -270,7 +270,7 @@ class StateMachineManagerTests { assertSessionTransfers( node1 sent sessionInit(PingPongFlow::class, 10L) to node2, - node2 sent sessionConfirm(node2) to node1, + node2 sent sessionConfirm() to node1, node2 sent sessionData(20L) to node1, node1 sent sessionData(11L) to node2, node2 sent sessionData(21L) to node1, @@ -337,7 +337,7 @@ class StateMachineManagerTests { assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java) assertSessionTransfers( node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, - node2 sent sessionConfirm(node2) to node1, + node2 sent sessionConfirm() to node1, node2 sent sessionEnd() to node1 ) } @@ -359,7 +359,7 @@ class StateMachineManagerTests { private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload) - private fun sessionConfirm(mockNode: MockNode) = SessionConfirm(0, 0, mockNode.info.legalIdentity) + private fun sessionConfirm() = SessionConfirm(0, 0) private fun sessionData(payload: Any) = SessionData(0, payload)