Review changes - merged two observables into one.

This commit is contained in:
Clinton Alexander 2016-08-03 16:49:37 +01:00
parent 72c4c2e5f7
commit b1c1b7f4b4
4 changed files with 19 additions and 21 deletions

View File

@ -20,6 +20,9 @@ interface NetworkMapCache {
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
} }
enum class MapChangeType { Added, Removed }
data class MapChange(val node: NodeInfo, val type: MapChangeType )
/** A list of nodes that advertise a network map service */ /** A list of nodes that advertise a network map service */
val networkMapNodes: List<NodeInfo> val networkMapNodes: List<NodeInfo>
/** A list of nodes that advertise a notary service */ /** A list of nodes that advertise a notary service */
@ -28,10 +31,8 @@ interface NetworkMapCache {
val ratesOracleNodes: List<NodeInfo> val ratesOracleNodes: List<NodeInfo>
/** A list of all nodes the cache is aware of */ /** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo> val partyNodes: List<NodeInfo>
/** Observer for additions to the cache */ /** Tracks changes to the network map cache */
val added: Observable<NodeInfo> val changed: Observable<MapChange>
/** Observer for removal from the cache */
val removed: Observable<NodeInfo>
/** /**
* A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside * A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside

View File

@ -27,6 +27,8 @@ import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.events.ScheduledActivityObserver
import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.identity.InMemoryIdentityService
import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
@ -309,8 +311,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) } services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
netMapCache.added.subscribe { node -> netMapCache.changed.subscribe { mapChange ->
service.registerIdentity(node.identity) if(mapChange.type == MapChangeType.Added) {
service.registerIdentity(mapChange.node.identity)
}
} }
return service return service

View File

@ -13,6 +13,8 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkCacheError import com.r3corda.core.node.services.NetworkCacheError
import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
import com.r3corda.core.node.services.NetworkMapCache.MapChange
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
@ -45,14 +47,9 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
get() = get(NodeInterestRates.Type) get() = get(NodeInterestRates.Type)
override val partyNodes: List<NodeInfo> override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value } get() = registeredNodes.map { it.value }
private val _added = PublishSubject.create<NodeInfo>() private val _changed = PublishSubject.create<MapChange>()
private val _removed = PublishSubject.create<NodeInfo>() override val changed: Observable<MapChange> = _changed
override val added: Observable<NodeInfo>
get() = _added
override val removed: Observable<NodeInfo>
get() = _removed
private var listener: (node: NodeInfo, added: Boolean) -> Unit? = { a, b -> Unit}
private var registeredForPush = false private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>()) protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
@ -104,12 +101,12 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
override fun addNode(node: NodeInfo) { override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node registeredNodes[node.identity] = node
netInternal?.registerTrustedAddress(node.address) netInternal?.registerTrustedAddress(node.address)
_added.onNext(node) _changed.onNext(MapChange(node, MapChangeType.Added))
} }
override fun removeNode(node: NodeInfo) { override fun removeNode(node: NodeInfo) {
registeredNodes.remove(node.identity) registeredNodes.remove(node.identity)
_removed.onNext(node) _changed.onNext(MapChange(node, MapChangeType.Removed))
} }
/** /**

View File

@ -5,6 +5,7 @@ import com.r3corda.core.crypto.DummyPublicKey
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -12,12 +13,7 @@ import rx.subjects.PublishSubject
* Network map cache with no backing map service. * Network map cache with no backing map service.
*/ */
class MockNetworkMapCache() : InMemoryNetworkMapCache(null) { class MockNetworkMapCache() : InMemoryNetworkMapCache(null) {
private val _added = PublishSubject.create<NodeInfo>() override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>()
private val _removed = PublishSubject.create<NodeInfo>()
override val added: Observable<NodeInfo>
get() = _added
override val removed: Observable<NodeInfo>
get() = _removed
data class MockAddress(val id: String): SingleMessageRecipient data class MockAddress(val id: String): SingleMessageRecipient