diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt index 997bada9a1..d909a8eb79 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt @@ -20,6 +20,9 @@ interface NetworkMapCache { val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) } + enum class MapChangeType { Added, Removed } + data class MapChange(val node: NodeInfo, val type: MapChangeType ) + /** A list of nodes that advertise a network map service */ val networkMapNodes: List /** A list of nodes that advertise a notary service */ @@ -28,10 +31,8 @@ interface NetworkMapCache { val ratesOracleNodes: List /** A list of all nodes the cache is aware of */ val partyNodes: List - /** Observer for additions to the cache */ - val added: Observable - /** Observer for removal from the cache */ - val removed: Observable + /** Tracks changes to the network map cache */ + val changed: Observable /** * A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 1c958299a0..4b3cd105e7 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -27,6 +27,8 @@ import com.r3corda.node.services.events.NodeSchedulerService import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.keys.E2ETestKeyManagementService +import com.r3corda.core.node.services.NetworkMapCache +import com.r3corda.core.node.services.NetworkMapCache.MapChangeType import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService @@ -309,8 +311,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) } - netMapCache.added.subscribe { node -> - service.registerIdentity(node.identity) + netMapCache.changed.subscribe { mapChange -> + if(mapChange.type == MapChangeType.Added) { + service.registerIdentity(mapChange.node.identity) + } } return service diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index b56cc68424..b023efbc69 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -13,6 +13,8 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.NetworkCacheError import com.r3corda.core.node.services.NetworkMapCache +import com.r3corda.core.node.services.NetworkMapCache.MapChangeType +import com.r3corda.core.node.services.NetworkMapCache.MapChange import com.r3corda.core.node.services.ServiceType import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.SingletonSerializeAsToken @@ -45,14 +47,9 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : get() = get(NodeInterestRates.Type) override val partyNodes: List get() = registeredNodes.map { it.value } - private val _added = PublishSubject.create() - private val _removed = PublishSubject.create() - override val added: Observable - get() = _added - override val removed: Observable - get() = _removed + private val _changed = PublishSubject.create() + override val changed: Observable = _changed - private var listener: (node: NodeInfo, added: Boolean) -> Unit? = { a, b -> Unit} private var registeredForPush = false protected var registeredNodes = Collections.synchronizedMap(HashMap()) @@ -104,12 +101,12 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : override fun addNode(node: NodeInfo) { registeredNodes[node.identity] = node netInternal?.registerTrustedAddress(node.address) - _added.onNext(node) + _changed.onNext(MapChange(node, MapChangeType.Added)) } override fun removeNode(node: NodeInfo) { registeredNodes.remove(node.identity) - _removed.onNext(node) + _changed.onNext(MapChange(node, MapChangeType.Removed)) } /** diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt index b652393804..ea31653359 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt @@ -5,6 +5,7 @@ import com.r3corda.core.crypto.DummyPublicKey import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.NetworkMapCache import rx.Observable import rx.subjects.PublishSubject @@ -12,12 +13,7 @@ import rx.subjects.PublishSubject * Network map cache with no backing map service. */ class MockNetworkMapCache() : InMemoryNetworkMapCache(null) { - private val _added = PublishSubject.create() - private val _removed = PublishSubject.create() - override val added: Observable - get() = _added - override val removed: Observable - get() = _removed + override val changed: Observable = PublishSubject.create() data class MockAddress(val id: String): SingleMessageRecipient