Converted MapChange into a sealed data structure so that only Modified has the previous node property

This commit is contained in:
Shams Asari 2016-12-02 15:55:27 +00:00
parent 9fdbf4e888
commit 10360ae8cf
7 changed files with 29 additions and 40 deletions

View File

@ -9,7 +9,7 @@ import net.corda.client.fxutils.foldToObservableList
import net.corda.client.fxutils.map
import net.corda.core.crypto.CompositeKey
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.node.services.network.NetworkMapService
import java.security.PublicKey
@ -19,9 +19,9 @@ class NetworkIdentityModel {
val networkIdentities: ObservableList<NodeInfo> =
networkIdentityObservable.foldToObservableList(Unit) { update, _accumulator, observableList ->
observableList.removeIf {
when (update.type) {
NetworkMapCache.MapChangeType.Removed -> it == update.node
NetworkMapCache.MapChangeType.Modified -> it == update.prevNodeInfo
when (update) {
is MapChange.Removed -> it == update.node
is MapChange.Modified -> it == update.previousNode
else -> false
}
}

View File

@ -4,7 +4,7 @@ import com.google.common.net.HostAndPort
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.CordaRPCClient
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.transactions.SignedTransaction
@ -39,14 +39,14 @@ class NodeMonitorModel {
private val transactionsSubject = PublishSubject.create<SignedTransaction>()
private val stateMachineTransactionMappingSubject = PublishSubject.create<StateMachineTransactionMapping>()
private val progressTrackingSubject = PublishSubject.create<ProgressTrackingEvent>()
private val networkMapSubject = PublishSubject.create<NetworkMapCache.MapChange>()
private val networkMapSubject = PublishSubject.create<MapChange>()
val stateMachineUpdates: Observable<StateMachineUpdate> = stateMachineUpdatesSubject
val vaultUpdates: Observable<Vault.Update> = vaultUpdatesSubject
val transactions: Observable<SignedTransaction> = transactionsSubject
val stateMachineTransactionMapping: Observable<StateMachineTransactionMapping> = stateMachineTransactionMappingSubject
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<NetworkMapCache.MapChange> = networkMapSubject
val networkMap: Observable<MapChange> = networkMapSubject
private val clientToServiceSource = PublishSubject.create<CashCommand>()
val clientToService: PublishSubject<CashCommand> = clientToServiceSource
@ -96,7 +96,7 @@ class NodeMonitorModel {
// Parties on network
val (parties, futurePartyUpdate) = proxy.networkMapUpdates()
futurePartyUpdate.startWith(parties.map { NetworkMapCache.MapChange(it, null, NetworkMapCache.MapChangeType.Added) }).subscribe(networkMapSubject)
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)
// Client -> Service
clientToServiceSource.subscribe {

View File

@ -19,8 +19,11 @@ import rx.Observable
*/
interface NetworkMapCache {
enum class MapChangeType { Added, Removed, Modified }
data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType)
sealed class MapChange(val node: NodeInfo) {
class Added(node: NodeInfo) : MapChange(node)
class Removed(node: NodeInfo) : MapChange(node)
class Modified(node: NodeInfo, val previousNode: NodeInfo) : MapChange(node)
}
/** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo>

View File

@ -14,7 +14,7 @@ import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -433,18 +433,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected open fun makeIdentityService(): IdentityService {
val service = InMemoryIdentityService()
service.registerIdentity(info.legalIdentity)
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.legalIdentity) }
netMapCache.changed.subscribe { mapChange ->
// TODO how should we handle network map removal
if (mapChange.type == MapChangeType.Added) {
if (mapChange is MapChange.Added) {
service.registerIdentity(mapChange.node.legalIdentity)
}
}
return service
}

View File

@ -10,7 +10,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.crypto.newSecureRandom
import net.corda.core.div
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
@ -108,23 +108,13 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
maybeDeployBridgeForAddress(networkMapService)
}
private fun destroyPossibleStaleBridge(change: NetworkMapCache.MapChange) {
fun removePreviousBridge() {
(change.prevNodeInfo?.address as? ArtemisAddress)?.let {
maybeDestroyBridge(it.queueName)
}
}
if (change.type == MapChangeType.Modified) {
removePreviousBridge()
} else if (change.type == MapChangeType.Removed) {
removePreviousBridge()
// TODO Fix the network map change data classes so that the remove event doesn't have two NodeInfo fields
val address = change.node.address
if (address is ArtemisAddress) {
maybeDestroyBridge(address.queueName)
}
private fun destroyPossibleStaleBridge(change: MapChange) {
val staleNodeInfo = when (change) {
is MapChange.Modified -> change.previousNode
is MapChange.Removed -> change.node
is MapChange.Added -> return
}
(staleNodeInfo.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) }
}
private fun configureAndStartServer() {

View File

@ -186,8 +186,9 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
register(NetworkMapCache.MapChange::class.java)
register(NetworkMapCache.MapChangeType::class.java)
register(NetworkMapCache.MapChange.Added::class.java)
register(NetworkMapCache.MapChange.Removed::class.java)
register(NetworkMapCache.MapChange.Modified::class.java)
register(ArtemisMessagingComponent.NodeAddress::class.java,
read = { kryo, input ->
ArtemisMessagingComponent.NodeAddress(

View File

@ -14,7 +14,6 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.NetworkCacheError
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -92,17 +91,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
synchronized(_changed) {
val previousNode = registeredNodes.put(node.legalIdentity, node)
if (previousNode == null) {
_changed.onNext(MapChange(node, previousNode, MapChangeType.Added))
_changed.onNext(MapChange.Added(node))
} else if (previousNode != node) {
_changed.onNext(MapChange(node, previousNode, MapChangeType.Modified))
_changed.onNext(MapChange.Modified(node, previousNode))
}
}
}
override fun removeNode(node: NodeInfo) {
synchronized(_changed) {
val oldValue = registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange(node, oldValue, MapChangeType.Removed))
registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange.Removed(node))
}
}