From 9ef1bf03556cb52a1ed5d8ee3af31a51d170b5a3 Mon Sep 17 00:00:00 2001 From: Clinton Alexander Date: Tue, 2 Aug 2016 17:08:22 +0100 Subject: [PATCH] Identity service now updates with the network map. --- .../com/r3corda/core/node/services/NetworkMapCache.kt | 5 +++++ .../kotlin/com/r3corda/node/internal/AbstractNode.kt | 4 +++- .../node/services/network/InMemoryNetworkMapCache.kt | 11 +++++++++++ .../node/services/network/MockNetworkMapCache.kt | 9 +++++++++ src/main/kotlin/com/r3corda/demos/IRSDemo.kt | 5 ----- 5 files changed, 28 insertions(+), 6 deletions(-) diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt index 4490da7716..ec879ae0cb 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt @@ -7,6 +7,7 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import org.slf4j.LoggerFactory import java.security.PublicKey +import rx.Observable /** * A network map contains lists of nodes on the network along with information about their identity keys, services @@ -27,6 +28,10 @@ interface NetworkMapCache { val ratesOracleNodes: List /** A list of all nodes the cache is aware of */ val partyNodes: List + /* Observer for changes to the cache */ + val added: Observable + val removed: Observable + /** * A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside * the scope of the network map service, and this is intended solely as a sanity check on configuration stored diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index a9ec16792c..1c958299a0 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -309,7 +309,9 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) } - // TODO: Subscribe to updates to the network map cache + netMapCache.added.subscribe { node -> + service.registerIdentity(node.identity) + } return service } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 07af6b3782..b56cc68424 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -23,6 +23,8 @@ import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.utilities.AddOrRemove +import rx.Observable +import rx.subjects.PublishSubject import java.security.PublicKey import java.security.SignatureException import java.util.* @@ -43,7 +45,14 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : get() = get(NodeInterestRates.Type) override val partyNodes: List get() = registeredNodes.map { it.value } + private val _added = PublishSubject.create() + private val _removed = PublishSubject.create() + override val added: Observable + get() = _added + override val removed: Observable + get() = _removed + private var listener: (node: NodeInfo, added: Boolean) -> Unit? = { a, b -> Unit} private var registeredForPush = false protected var registeredNodes = Collections.synchronizedMap(HashMap()) @@ -95,10 +104,12 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : override fun addNode(node: NodeInfo) { registeredNodes[node.identity] = node netInternal?.registerTrustedAddress(node.address) + _added.onNext(node) } override fun removeNode(node: NodeInfo) { registeredNodes.remove(node.identity) + _removed.onNext(node) } /** diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt index bf93437555..b652393804 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt @@ -5,11 +5,20 @@ import com.r3corda.core.crypto.DummyPublicKey import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo +import rx.Observable +import rx.subjects.PublishSubject /** * Network map cache with no backing map service. */ class MockNetworkMapCache() : InMemoryNetworkMapCache(null) { + private val _added = PublishSubject.create() + private val _removed = PublishSubject.create() + override val added: Observable + get() = _added + override val removed: Observable + get() = _removed + data class MockAddress(val id: String): SingleMessageRecipient init { diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index 1423f5cfd5..c6977b52c5 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -399,11 +399,6 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi val node = logElapsedTime("Node startup", log) { Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start() } - // TODO: This should all be replaced by the identity service being updated - // as the network map changes. - for (identityFile in params.tradeWithIdentities) { - node.services.identityService.registerIdentity(parsePartyFromFile(identityFile)) - } return node }