mirror of
https://github.com/corda/corda.git
synced 2025-06-22 17:09:00 +00:00
Identity service now updates with the network map.
This commit is contained in:
@ -7,6 +7,7 @@ import com.r3corda.core.messaging.MessagingService
|
|||||||
import com.r3corda.core.node.NodeInfo
|
import com.r3corda.core.node.NodeInfo
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
import rx.Observable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
||||||
@ -27,6 +28,10 @@ interface NetworkMapCache {
|
|||||||
val ratesOracleNodes: List<NodeInfo>
|
val ratesOracleNodes: List<NodeInfo>
|
||||||
/** A list of all nodes the cache is aware of */
|
/** A list of all nodes the cache is aware of */
|
||||||
val partyNodes: List<NodeInfo>
|
val partyNodes: List<NodeInfo>
|
||||||
|
/* Observer for changes to the cache */
|
||||||
|
val added: Observable<NodeInfo>
|
||||||
|
val removed: Observable<NodeInfo>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside
|
* A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside
|
||||||
* the scope of the network map service, and this is intended solely as a sanity check on configuration stored
|
* the scope of the network map service, and this is intended solely as a sanity check on configuration stored
|
||||||
|
@ -309,7 +309,9 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
|
|
||||||
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
|
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
|
||||||
|
|
||||||
// TODO: Subscribe to updates to the network map cache
|
netMapCache.added.subscribe { node ->
|
||||||
|
service.registerIdentity(node.identity)
|
||||||
|
}
|
||||||
|
|
||||||
return service
|
return service
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ import com.r3corda.node.services.api.RegulatorService
|
|||||||
import com.r3corda.node.services.clientapi.NodeInterestRates
|
import com.r3corda.node.services.clientapi.NodeInterestRates
|
||||||
import com.r3corda.node.services.transactions.NotaryService
|
import com.r3corda.node.services.transactions.NotaryService
|
||||||
import com.r3corda.node.utilities.AddOrRemove
|
import com.r3corda.node.utilities.AddOrRemove
|
||||||
|
import rx.Observable
|
||||||
|
import rx.subjects.PublishSubject
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.security.SignatureException
|
import java.security.SignatureException
|
||||||
import java.util.*
|
import java.util.*
|
||||||
@ -43,7 +45,14 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
|
|||||||
get() = get(NodeInterestRates.Type)
|
get() = get(NodeInterestRates.Type)
|
||||||
override val partyNodes: List<NodeInfo>
|
override val partyNodes: List<NodeInfo>
|
||||||
get() = registeredNodes.map { it.value }
|
get() = registeredNodes.map { it.value }
|
||||||
|
private val _added = PublishSubject.create<NodeInfo>()
|
||||||
|
private val _removed = PublishSubject.create<NodeInfo>()
|
||||||
|
override val added: Observable<NodeInfo>
|
||||||
|
get() = _added
|
||||||
|
override val removed: Observable<NodeInfo>
|
||||||
|
get() = _removed
|
||||||
|
|
||||||
|
private var listener: (node: NodeInfo, added: Boolean) -> Unit? = { a, b -> Unit}
|
||||||
private var registeredForPush = false
|
private var registeredForPush = false
|
||||||
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
|
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
|
||||||
|
|
||||||
@ -95,10 +104,12 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
|
|||||||
override fun addNode(node: NodeInfo) {
|
override fun addNode(node: NodeInfo) {
|
||||||
registeredNodes[node.identity] = node
|
registeredNodes[node.identity] = node
|
||||||
netInternal?.registerTrustedAddress(node.address)
|
netInternal?.registerTrustedAddress(node.address)
|
||||||
|
_added.onNext(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun removeNode(node: NodeInfo) {
|
override fun removeNode(node: NodeInfo) {
|
||||||
registeredNodes.remove(node.identity)
|
registeredNodes.remove(node.identity)
|
||||||
|
_removed.onNext(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -5,11 +5,20 @@ import com.r3corda.core.crypto.DummyPublicKey
|
|||||||
import com.r3corda.core.crypto.Party
|
import com.r3corda.core.crypto.Party
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.NodeInfo
|
import com.r3corda.core.node.NodeInfo
|
||||||
|
import rx.Observable
|
||||||
|
import rx.subjects.PublishSubject
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Network map cache with no backing map service.
|
* Network map cache with no backing map service.
|
||||||
*/
|
*/
|
||||||
class MockNetworkMapCache() : InMemoryNetworkMapCache(null) {
|
class MockNetworkMapCache() : InMemoryNetworkMapCache(null) {
|
||||||
|
private val _added = PublishSubject.create<NodeInfo>()
|
||||||
|
private val _removed = PublishSubject.create<NodeInfo>()
|
||||||
|
override val added: Observable<NodeInfo>
|
||||||
|
get() = _added
|
||||||
|
override val removed: Observable<NodeInfo>
|
||||||
|
get() = _removed
|
||||||
|
|
||||||
data class MockAddress(val id: String): SingleMessageRecipient
|
data class MockAddress(val id: String): SingleMessageRecipient
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
@ -399,11 +399,6 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi
|
|||||||
val node = logElapsedTime("Node startup", log) {
|
val node = logElapsedTime("Node startup", log) {
|
||||||
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start()
|
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start()
|
||||||
}
|
}
|
||||||
// TODO: This should all be replaced by the identity service being updated
|
|
||||||
// as the network map changes.
|
|
||||||
for (identityFile in params.tradeWithIdentities) {
|
|
||||||
node.services.identityService.registerIdentity(parsePartyFromFile(identityFile))
|
|
||||||
}
|
|
||||||
|
|
||||||
return node
|
return node
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user