Clean up NetworkMapCache and InMemoryNetworkMapCache

This commit is contained in:
Shams Asari 2016-12-02 14:26:12 +00:00
parent b35f711bd9
commit 5432905b4a
5 changed files with 73 additions and 123 deletions

5
.gitignore vendored
View File

@ -19,6 +19,8 @@ local.properties
lib/dokka.jar lib/dokka.jar
**/logs/*
### JetBrains template ### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio
@ -77,6 +79,3 @@ crashlytics-build.properties
# docs related # docs related
docs/virtualenv/ docs/virtualenv/
/logs/
node/logs/
samples/*/logs/

View File

@ -8,7 +8,7 @@ import net.corda.core.crypto.Party
import net.corda.core.messaging.MessagingService import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import org.slf4j.LoggerFactory import net.corda.core.randomOrNull
import rx.Observable import rx.Observable
/** /**
@ -18,30 +18,26 @@ import rx.Observable
* with a specified network map service, which it fetches data from and then subscribes to updates of. * with a specified network map service, which it fetches data from and then subscribes to updates of.
*/ */
interface NetworkMapCache { interface NetworkMapCache {
companion object {
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
}
enum class MapChangeType { Added, Removed, Modified } enum class MapChangeType { Added, Removed, Modified }
data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType) data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType)
/** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo>
/** A list of nodes that advertise a network map service */ /** A list of nodes that advertise a network map service */
val networkMapNodes: List<NodeInfo> val networkMapNodes: List<NodeInfo>
/** A list of nodes that advertise a notary service */ /** A list of nodes that advertise a notary service */
val notaryNodes: List<NodeInfo> val notaryNodes: List<NodeInfo> get() = getNodesWithService(ServiceType.notary)
/** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo>
/** Tracks changes to the network map cache */
val changed: Observable<MapChange>
/** Future to track completion of the NetworkMapService registration. */
val mapServiceRegistered: ListenableFuture<Unit>
/** /**
* A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside * A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outside
* the scope of the network map service, and this is intended solely as a sanity check on configuration stored * the scope of the network map service, and this is intended solely as a sanity check on configuration stored
* elsewhere. * elsewhere.
*/ */
val regulators: List<NodeInfo> val regulatorNodes: List<NodeInfo> get() = getNodesWithService(ServiceType.regulator)
/** Tracks changes to the network map cache */
val changed: Observable<MapChange>
/** Future to track completion of the NetworkMapService registration. */
val mapServiceRegistered: ListenableFuture<Unit>
/** /**
* Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the
@ -49,32 +45,31 @@ interface NetworkMapCache {
*/ */
fun track(): Pair<List<NodeInfo>, Observable<MapChange>> fun track(): Pair<List<NodeInfo>, Observable<MapChange>>
/** /** Get the collection of nodes which advertise a specific service. */
* Get a copy of all nodes in the map. fun getNodesWithService(serviceType: ServiceType): List<NodeInfo> {
*/ return partyNodes.filter { it.advertisedServices.any { it.info.type.isSubTypeOf(serviceType) } }
fun get(): Collection<NodeInfo> }
/**
* Get the collection of nodes which advertise a specific service.
*/
fun get(serviceType: ServiceType): Collection<NodeInfo>
/** /**
* Get a recommended node that advertises a service, and is suitable for the specified contract and parties. * Get a recommended node that advertises a service, and is suitable for the specified contract and parties.
* Implementations might understand, for example, the correct regulator to use for specific contracts/parties, * Implementations might understand, for example, the correct regulator to use for specific contracts/parties,
* or the appropriate oracle for a contract. * or the appropriate oracle for a contract.
*/ */
fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = getNodesWithService(type).firstOrNull()
/** /** Look up the node info for a legal name. */
* Look up the node info for a legal name. fun getNodeByLegalName(name: String): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == name }
*/
fun getNodeByLegalName(name: String): NodeInfo?
/** /** Look up the node info for a composite key. */
* Look up the node info for a composite key. fun getNodeByCompositeKey(compositeKey: CompositeKey): NodeInfo? {
*/ // Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
fun getNodeByCompositeKey(compositeKey: CompositeKey): NodeInfo? val candidates = partyNodes.filter {
(it.legalIdentity.owningKey == compositeKey)
|| it.advertisedServices.any { it.identity.owningKey == compositeKey }
}
check(candidates.size <= 1) { "Found more than one match for key $compositeKey" }
return candidates.singleOrNull()
}
/** /**
* Given a [party], returns a node advertising it as an identity. If more than one node found the result * Given a [party], returns a node advertising it as an identity. If more than one node found the result
@ -86,29 +81,40 @@ interface NetworkMapCache {
* will be found, and this method will return a randomly chosen one. If [party] is an individual (legal) identity, * will be found, and this method will return a randomly chosen one. If [party] is an individual (legal) identity,
* we currently assume that it will be advertised by one node only, which will be returned as the result. * we currently assume that it will be advertised by one node only, which will be returned as the result.
*/ */
fun getRepresentativeNode(party: Party): NodeInfo? fun getRepresentativeNode(party: Party): NodeInfo? {
return partyNodes.randomOrNull { it.legalIdentity == party || it.advertisedServices.any { it.identity == party } }
}
/** /** Gets a notary identity by the given name. */
* Gets a notary identity by the given name. fun getNotary(name: String): Party? {
*/ val notaryNode = notaryNodes.randomOrNull {
fun getNotary(name: String): Party? it.advertisedServices.any { it.info.type.isSubTypeOf(ServiceType.notary) && it.info.name == name }
}
return notaryNode?.notaryIdentity
}
/** /**
* Returns a notary identity advertised by any of the nodes on the network (chosen at random) * Returns a notary identity advertised by any of the nodes on the network (chosen at random)
*
* @param type Limits the result to notaries of the specified type (optional) * @param type Limits the result to notaries of the specified type (optional)
*/ */
fun getAnyNotary(type: ServiceType? = null): Party? fun getAnyNotary(type: ServiceType? = null): Party? {
val nodes = if (type == null) {
notaryNodes
} else {
require(type != ServiceType.notary && type.isSubTypeOf(ServiceType.notary)) {
"The provided type must be a specific notary sub-type"
}
notaryNodes.filter { it.advertisedServices.any { it.info.type == type } }
}
return nodes.randomOrNull()?.notaryIdentity
}
/** /** Checks whether a given party is an advertised notary identity */
* Checks whether a given party is an advertised notary identity fun isNotary(party: Party): Boolean = notaryNodes.any { it.notaryIdentity == party }
*/
fun isNotary(party: Party): Boolean
/** /**
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further * Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates. * updates.
*
* @param net the network messaging service. * @param net the network messaging service.
* @param networkMapAddress the network map service to fetch current state from. * @param networkMapAddress the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates. * @param subscribe if the cache should subscribe to updates.
@ -118,27 +124,20 @@ interface NetworkMapCache {
fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit> subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
/** /** Adds a node to the local cache (generally only used for adding ourselves). */
* Adds a node to the local cache (generally only used for adding ourselves).
*/
fun addNode(node: NodeInfo) fun addNode(node: NodeInfo)
/** /** Removes a node from the local cache. */
* Removes a node from the local cache.
*/
fun removeNode(node: NodeInfo) fun removeNode(node: NodeInfo)
/** /**
* Deregister from updates from the given map service. * Deregister from updates from the given map service.
*
* @param net the network messaging service. * @param net the network messaging service.
* @param service the network map service to fetch current state from. * @param service the network map service to fetch current state from.
*/ */
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
/** /** For testing where the network map cache is manipulated marks the service as immediately ready. */
* For testing where the network map cache is manipulated marks the service as immediately ready.
*/
@VisibleForTesting @VisibleForTesting
fun runWithoutMapService() fun runWithoutMapService()
} }

View File

@ -75,7 +75,7 @@ object TwoPartyDealFlow {
abstract val myKeyPair: KeyPair abstract val myKeyPair: KeyPair
override fun getCounterpartyMarker(party: Party): Class<*> { override fun getCounterpartyMarker(party: Party): Class<*> {
return if (serviceHub.networkMapCache.regulators.any { it.legalIdentity == party }) { return if (serviceHub.networkMapCache.regulatorNodes.any { it.legalIdentity == party }) {
MarkerForBogusRegulatorFlow::class.java MarkerForBogusRegulatorFlow::class.java
} else { } else {
super.getCounterpartyMarker(party) super.getCounterpartyMarker(party)
@ -149,7 +149,7 @@ object TwoPartyDealFlow {
logger.trace { "Deal stored" } logger.trace { "Deal stored" }
progressTracker.currentStep = COPYING_TO_REGULATOR progressTracker.currentStep = COPYING_TO_REGULATOR
val regulators = serviceHub.networkMapCache.regulators val regulators = serviceHub.networkMapCache.regulatorNodes
if (regulators.isNotEmpty()) { if (regulators.isNotEmpty()) {
// Copy the transaction to every regulator in the network. This is obviously completely bogus, it's // Copy the transaction to every regulator in the network. This is obviously completely bogus, it's
// just for demo purposes. // just for demo purposes.

View File

@ -4,8 +4,6 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.Contract
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.map import net.corda.core.map
import net.corda.core.messaging.MessagingService import net.corda.core.messaging.MessagingService
@ -17,11 +15,10 @@ import net.corda.core.node.services.NetworkCacheError
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.NetworkMapCache.MapChangeType import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.node.services.ServiceType
import net.corda.core.randomOrNull
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.sendRequest import net.corda.flows.sendRequest
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
@ -36,27 +33,22 @@ import javax.annotation.concurrent.ThreadSafe
/** /**
* Extremely simple in-memory cache of the network map. * Extremely simple in-memory cache of the network map.
*
* TODO: some method implementations can be moved up to [NetworkMapCache]
*/ */
@ThreadSafe @ThreadSafe
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache { open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache {
override val networkMapNodes: List<NodeInfo> companion object {
get() = get(NetworkMapService.type) val logger = loggerFor<InMemoryNetworkMapCache>()
override val regulators: List<NodeInfo> }
get() = get(ServiceType.regulator)
override val notaryNodes: List<NodeInfo> override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
get() = get(ServiceType.notary) override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value }
private val _changed = PublishSubject.create<MapChange>() private val _changed = PublishSubject.create<MapChange>()
override val changed: Observable<MapChange> = _changed override val changed: Observable<MapChange> = _changed
private val _registrationFuture = SettableFuture.create<Unit>() private val _registrationFuture = SettableFuture.create<Unit>()
override val mapServiceRegistered: ListenableFuture<Unit> override val mapServiceRegistered: ListenableFuture<Unit> get() = _registrationFuture
get() = _registrationFuture
private var registeredForPush = false private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>()) protected var registeredNodes: MutableMap<Party, NodeInfo> = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> { override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> {
synchronized(_changed) { synchronized(_changed) {
@ -64,44 +56,6 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
} }
} }
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.any { it.info.type.isSubTypeOf(serviceType) } }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun getNodeByLegalName(name: String) = get().singleOrNull { it.legalIdentity.name == name }
override fun getNodeByCompositeKey(compositeKey: CompositeKey): NodeInfo? {
// Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
val candidates = get().filter {
(it.legalIdentity.owningKey == compositeKey)
|| it.advertisedServices.any { it.identity.owningKey == compositeKey }
}
if (candidates.size > 1) {
throw IllegalStateException("Found more than one match for key $compositeKey")
}
return candidates.singleOrNull()
}
override fun getRepresentativeNode(party: Party): NodeInfo? {
return partyNodes.randomOrNull { it.legalIdentity == party || it.advertisedServices.any { it.identity == party } }
}
override fun getNotary(name: String): Party? {
val notaryNode = notaryNodes.randomOrNull { it.advertisedServices.any { it.info.type.isSubTypeOf(ServiceType.notary) && it.info.name == name } }
return notaryNode?.notaryIdentity
}
override fun getAnyNotary(type: ServiceType?): Party? {
val nodes = if (type == null) {
notaryNodes
} else {
require(type != ServiceType.notary && type.isSubTypeOf(ServiceType.notary)) { "The provided type must be a specific notary sub-type" }
notaryNodes.filter { it.advertisedServices.any { it.info.type == type } }
}
return nodes.randomOrNull()?.notaryIdentity
}
override fun isNotary(party: Party) = notaryNodes.any { it.notaryIdentity == party }
override fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean, override fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> { ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) { if (subscribe && !registeredForPush) {
@ -114,9 +68,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
net.send(ackMessage, req.replyTo) net.send(ackMessage, req.replyTo)
processUpdatePush(req) processUpdatePush(req)
} catch(e: NodeMapError) { } catch(e: NodeMapError) {
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) { } catch(e: Exception) {
NetworkMapCache.logger.error("Exception processing update from network map service", e) logger.error("Exception processing update from network map service", e)
} }
} }
registeredForPush = true registeredForPush = true
@ -136,14 +90,13 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun addNode(node: NodeInfo) { override fun addNode(node: NodeInfo) {
synchronized(_changed) { synchronized(_changed) {
val oldValue = registeredNodes.put(node.legalIdentity, node) val previousNode = registeredNodes.put(node.legalIdentity, node)
if (oldValue == null) { if (previousNode == null) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Added)) _changed.onNext(MapChange(node, previousNode, MapChangeType.Added))
} else if (oldValue != node) { } else if (previousNode != node) {
_changed.onNext(MapChange(node, oldValue, MapChangeType.Modified)) _changed.onNext(MapChange(node, previousNode, MapChangeType.Modified))
} }
} }
} }
override fun removeNode(node: NodeInfo) { override fun removeNode(node: NodeInfo) {
@ -155,7 +108,6 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
/** /**
* Unsubscribes from updates from the given map service. * Unsubscribes from updates from the given map service.
*
* @param service the network map service to listen to updates from. * @param service the network map service to listen to updates from.
*/ */
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> { override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {

View File

@ -67,7 +67,7 @@ object FixingFlow {
val ptx = TransactionType.General.Builder(txState.notary) val ptx = TransactionType.General.Builder(txState.notary)
val oracle = serviceHub.networkMapCache.get(handshake.payload.oracleType).first() val oracle = serviceHub.networkMapCache.getNodesWithService(handshake.payload.oracleType).first()
val oracleParty = oracle.serviceIdentities(handshake.payload.oracleType).first() val oracleParty = oracle.serviceIdentities(handshake.payload.oracleType).first()
// TODO Could it be solved in better way, move filtering here not in RatesFixFlow? // TODO Could it be solved in better way, move filtering here not in RatesFixFlow?