From 10360ae8cf7498d2037639f9b8607843287e23f3 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 2 Dec 2016 15:55:27 +0000 Subject: [PATCH] Converted MapChange into a sealed data structure so that only Modified has the previous node property --- .../client/model/NetworkIdentityModel.kt | 8 +++---- .../corda/client/model/NodeMonitorModel.kt | 8 +++---- .../core/node/services/NetworkMapCache.kt | 7 ++++-- .../net/corda/node/internal/AbstractNode.kt | 8 ++----- .../messaging/ArtemisMessagingServer.kt | 24 ++++++------------- .../node/services/messaging/RPCStructures.kt | 5 ++-- .../network/InMemoryNetworkMapCache.kt | 9 ++++--- 7 files changed, 29 insertions(+), 40 deletions(-) diff --git a/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt b/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt index 0414846247..c1673238a4 100644 --- a/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt @@ -9,7 +9,7 @@ import net.corda.client.fxutils.foldToObservableList import net.corda.client.fxutils.map import net.corda.core.crypto.CompositeKey import net.corda.core.node.NodeInfo -import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.node.services.network.NetworkMapService import java.security.PublicKey @@ -19,9 +19,9 @@ class NetworkIdentityModel { val networkIdentities: ObservableList = networkIdentityObservable.foldToObservableList(Unit) { update, _accumulator, observableList -> observableList.removeIf { - when (update.type) { - NetworkMapCache.MapChangeType.Removed -> it == update.node - NetworkMapCache.MapChangeType.Modified -> it == update.prevNodeInfo + when (update) { + is MapChange.Removed -> it == update.node + is MapChange.Modified -> it == update.previousNode else -> false } } diff --git a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt index 5949f8aa8c..5fbf0a30d6 100644 --- a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt @@ -4,7 +4,7 @@ import com.google.common.net.HostAndPort import javafx.beans.property.SimpleObjectProperty import net.corda.client.CordaRPCClient import net.corda.core.flows.StateMachineRunId -import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault import net.corda.core.transactions.SignedTransaction @@ -39,14 +39,14 @@ class NodeMonitorModel { private val transactionsSubject = PublishSubject.create() private val stateMachineTransactionMappingSubject = PublishSubject.create() private val progressTrackingSubject = PublishSubject.create() - private val networkMapSubject = PublishSubject.create() + private val networkMapSubject = PublishSubject.create() val stateMachineUpdates: Observable = stateMachineUpdatesSubject val vaultUpdates: Observable = vaultUpdatesSubject val transactions: Observable = transactionsSubject val stateMachineTransactionMapping: Observable = stateMachineTransactionMappingSubject val progressTracking: Observable = progressTrackingSubject - val networkMap: Observable = networkMapSubject + val networkMap: Observable = networkMapSubject private val clientToServiceSource = PublishSubject.create() val clientToService: PublishSubject = clientToServiceSource @@ -96,7 +96,7 @@ class NodeMonitorModel { // Parties on network val (parties, futurePartyUpdate) = proxy.networkMapUpdates() - futurePartyUpdate.startWith(parties.map { NetworkMapCache.MapChange(it, null, NetworkMapCache.MapChangeType.Added) }).subscribe(networkMapSubject) + futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject) // Client -> Service clientToServiceSource.subscribe { diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index c905c5bfd4..4aad949d96 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -19,8 +19,11 @@ import rx.Observable */ interface NetworkMapCache { - enum class MapChangeType { Added, Removed, Modified } - data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType) + sealed class MapChange(val node: NodeInfo) { + class Added(node: NodeInfo) : MapChange(node) + class Removed(node: NodeInfo) : MapChange(node) + class Modified(node: NodeInfo, val previousNode: NodeInfo) : MapChange(node) + } /** A list of all nodes the cache is aware of */ val partyNodes: List diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 39bb1909f9..34cd6bd068 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -14,7 +14,7 @@ import net.corda.core.flows.FlowStateMachine import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.* import net.corda.core.node.services.* -import net.corda.core.node.services.NetworkMapCache.MapChangeType +import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -433,18 +433,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo protected open fun makeIdentityService(): IdentityService { val service = InMemoryIdentityService() - service.registerIdentity(info.legalIdentity) - services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.legalIdentity) } - netMapCache.changed.subscribe { mapChange -> // TODO how should we handle network map removal - if (mapChange.type == MapChangeType.Added) { + if (mapChange is MapChange.Added) { service.registerIdentity(mapChange.node.legalIdentity) } } - return service } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index f438d7476f..a6f6bd5a8f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -10,7 +10,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA import net.corda.core.crypto.newSecureRandom import net.corda.core.div import net.corda.core.node.services.NetworkMapCache -import net.corda.core.node.services.NetworkMapCache.MapChangeType +import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.printBasicNodeInfo @@ -108,23 +108,13 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, maybeDeployBridgeForAddress(networkMapService) } - private fun destroyPossibleStaleBridge(change: NetworkMapCache.MapChange) { - fun removePreviousBridge() { - (change.prevNodeInfo?.address as? ArtemisAddress)?.let { - maybeDestroyBridge(it.queueName) - } - } - - if (change.type == MapChangeType.Modified) { - removePreviousBridge() - } else if (change.type == MapChangeType.Removed) { - removePreviousBridge() - // TODO Fix the network map change data classes so that the remove event doesn't have two NodeInfo fields - val address = change.node.address - if (address is ArtemisAddress) { - maybeDestroyBridge(address.queueName) - } + private fun destroyPossibleStaleBridge(change: MapChange) { + val staleNodeInfo = when (change) { + is MapChange.Modified -> change.previousNode + is MapChange.Removed -> change.node + is MapChange.Added -> return } + (staleNodeInfo.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) } } private fun configureAndStartServer() { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index ac647e80f9..e334dd6816 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -186,8 +186,9 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(ServiceEntry::class.java) register(NodeInfo::class.java) register(PhysicalLocation::class.java) - register(NetworkMapCache.MapChange::class.java) - register(NetworkMapCache.MapChangeType::class.java) + register(NetworkMapCache.MapChange.Added::class.java) + register(NetworkMapCache.MapChange.Removed::class.java) + register(NetworkMapCache.MapChange.Modified::class.java) register(ArtemisMessagingComponent.NodeAddress::class.java, read = { kryo, input -> ArtemisMessagingComponent.NodeAddress( diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index 4a2cfe60f0..3dafa190c6 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -14,7 +14,6 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.NetworkCacheError import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache.MapChange -import net.corda.core.node.services.NetworkMapCache.MapChangeType import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -92,17 +91,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach synchronized(_changed) { val previousNode = registeredNodes.put(node.legalIdentity, node) if (previousNode == null) { - _changed.onNext(MapChange(node, previousNode, MapChangeType.Added)) + _changed.onNext(MapChange.Added(node)) } else if (previousNode != node) { - _changed.onNext(MapChange(node, previousNode, MapChangeType.Modified)) + _changed.onNext(MapChange.Modified(node, previousNode)) } } } override fun removeNode(node: NodeInfo) { synchronized(_changed) { - val oldValue = registeredNodes.remove(node.legalIdentity) - _changed.onNext(MapChange(node, oldValue, MapChangeType.Removed)) + registeredNodes.remove(node.legalIdentity) + _changed.onNext(MapChange.Removed(node)) } }