From 44054c47f80c674543de67e4514997e099c022d0 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Wed, 27 Apr 2016 14:01:12 +0100 Subject: [PATCH] Add InMemoryNetworkMapCache --- src/main/kotlin/core/node/AbstractNode.kt | 3 +- .../core/node/services/NetworkMapCache.kt | 92 +++++++++++++++++-- .../core/node/services/RegulatorService.kt | 9 ++ .../kotlin/core/node/services/Services.kt | 5 + .../core/testing/MockNetworkMapCache.kt | 40 +++++--- src/main/kotlin/core/testing/Simulation.kt | 7 +- src/main/kotlin/core/utilities/AddOrRemove.kt | 9 ++ src/main/kotlin/demos/IRSDemo.kt | 8 +- 8 files changed, 149 insertions(+), 24 deletions(-) create mode 100644 src/main/kotlin/core/node/services/RegulatorService.kt create mode 100644 src/main/kotlin/core/utilities/AddOrRemove.kt diff --git a/src/main/kotlin/core/node/AbstractNode.kt b/src/main/kotlin/core/node/AbstractNode.kt index 796bbff4e7..7d372f4b2e 100644 --- a/src/main/kotlin/core/node/AbstractNode.kt +++ b/src/main/kotlin/core/node/AbstractNode.kt @@ -98,10 +98,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, } timestamperAddress } else { + info.advertisedServices += TimestamperService.Type inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock) NodeInfo(net.myAddress, storage.myLegalIdentity, setOf(TimestamperService.Type)) } - (services.networkMapCache as MockNetworkMapCache).timestampingNodes.add(tsid) + (services.networkMapCache as MockNetworkMapCache).addRegistration(tsid) } lateinit var interestRatesService: NodeInterestRates.Service diff --git a/src/main/kotlin/core/node/services/NetworkMapCache.kt b/src/main/kotlin/core/node/services/NetworkMapCache.kt index ae135c53ff..770847999a 100644 --- a/src/main/kotlin/core/node/services/NetworkMapCache.kt +++ b/src/main/kotlin/core/node/services/NetworkMapCache.kt @@ -1,21 +1,101 @@ package core.node.services +import core.Contract +import core.Party import core.node.NodeInfo +import core.utilities.AddOrRemove +import org.slf4j.LoggerFactory +import java.security.SignatureException +import java.util.* +import javax.annotation.concurrent.ThreadSafe /** * A network map contains lists of nodes on the network along with information about their identity keys, services - * they provide and host names or IP addresses where they can be connected to. A reasonable architecture for the - * network map service might be one like the Tor directory authorities, where several nodes linked by RAFT or Paxos - * elect a leader and that leader distributes signed documents describing the network layout. Those documents can - * then be cached by every node and thus a network map can be retrieved given only a single successful peer connection. - * - * This interface assumes fast, synchronous access to an in-memory map. + * they provide and host names or IP addresses where they can be connected to. The cache wraps around a map fetched + * from an authoritative service, and adds easy lookup of the data stored within it. Generally it would be initialised + * with a specified network map service, which it fetches data from and then subscribes to updates of. */ interface NetworkMapCache { + companion object { + val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) + } + + /** A list of nodes that advertise a timestamping service */ val timestampingNodes: List + /** A list of nodes that advertise a rates oracle service */ val ratesOracleNodes: List + /** A list of all nodes the cache is aware of */ val partyNodes: List + /** A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outwith + * the scope of the network map service, and this is intended solely as a sanity check on configuration stored + * elsewhere. + */ val regulators: List + /** + * Look up the node info for a party. + */ fun nodeForPartyName(name: String): NodeInfo? = partyNodes.singleOrNull { it.identity.name == name } + + /** + * Get a copy of all nodes in the map. + */ + fun get(): Collection + + /** + * Get the collection of nodes which advertise a specific service. + */ + fun get(serviceType: ServiceType): Collection + + /** + * Get a recommended node that advertises a service, and is suitable for the specified contract and parties. + * Implementations might understand, for example, the correct regulator to use for specific contracts/parties, + * or the appropriate oracle for a contract. + */ + fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? + + /** + * Adds a node to the local cache (generally only used for adding ourselves) + */ + fun addNode(node: NodeInfo) + + /** + * Removes a node from the local cache + */ + fun removeNode(node: NodeInfo) } + +/** + * Extremely simple in-memory cache of the network map. + */ +@ThreadSafe +open class InMemoryNetworkMapCache() : NetworkMapCache { + override val regulators: List + get() = get(RegulatorService.Type) + override val timestampingNodes: List + get() = get(TimestamperService.Type) + override val ratesOracleNodes: List + get() = get(NodeInterestRates.Type) + override val partyNodes: List + get() = registeredNodes.map { it.value } + + private var registeredForPush = false + protected var registeredNodes = Collections.synchronizedMap(HashMap()) + + override fun get() = registeredNodes.map { it.value } + override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value } + override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull() + + override fun addNode(node: NodeInfo) { + registeredNodes[node.identity] = node + } + + override fun removeNode(node: NodeInfo) { + registeredNodes.remove(node.identity) + } +} + +sealed class NetworkCacheError : Exception() { + /** Indicates a failure to deregister, because of a rejected request from the remote node */ + class DeregistrationFailed : NetworkCacheError() +} \ No newline at end of file diff --git a/src/main/kotlin/core/node/services/RegulatorService.kt b/src/main/kotlin/core/node/services/RegulatorService.kt new file mode 100644 index 0000000000..0c8e6476ea --- /dev/null +++ b/src/main/kotlin/core/node/services/RegulatorService.kt @@ -0,0 +1,9 @@ +package core.node.services + +/** + * Placeholder interface for regulator services. + */ +interface RegulatorService { + object Type : ServiceType("corda.regulator") + +} \ No newline at end of file diff --git a/src/main/kotlin/core/node/services/Services.kt b/src/main/kotlin/core/node/services/Services.kt index 28156265cb..fb57ac24c0 100644 --- a/src/main/kotlin/core/node/services/Services.kt +++ b/src/main/kotlin/core/node/services/Services.kt @@ -12,6 +12,11 @@ import java.security.PublicKey import java.time.Clock import java.util.* +/** + * Postfix for base topics when sending a request to a service. + */ +val TOPIC_DEFAULT_POSTFIX = ".0" + /** * This file defines various 'services' which are not currently fleshed out. A service is a module that provides * immutable snapshots of data that may be changing in response to user or network events. diff --git a/src/main/kotlin/core/testing/MockNetworkMapCache.kt b/src/main/kotlin/core/testing/MockNetworkMapCache.kt index ef01975e27..4c3dd41d3a 100644 --- a/src/main/kotlin/core/testing/MockNetworkMapCache.kt +++ b/src/main/kotlin/core/testing/MockNetworkMapCache.kt @@ -7,23 +7,41 @@ */ package core.testing +import co.paralleluniverse.common.util.VisibleForTesting import core.Party import core.crypto.DummyPublicKey import core.messaging.SingleMessageRecipient -import core.node.services.NetworkMapCache +import core.node.services.InMemoryNetworkMapCache import core.node.NodeInfo -import java.util.* -class MockNetworkMapCache : NetworkMapCache { - data class MockAddress(val id: String) : SingleMessageRecipient - - override val timestampingNodes = Collections.synchronizedList(ArrayList()) - override val ratesOracleNodes = Collections.synchronizedList(ArrayList()) - override val partyNodes = Collections.synchronizedList(ArrayList()) - override val regulators = Collections.synchronizedList(ArrayList()) +/** + * Network map cache with no backing map service. + */ +class MockNetworkMapCache() : InMemoryNetworkMapCache() { + data class MockAddress(val id: String): SingleMessageRecipient init { - partyNodes.add(NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")))) - partyNodes.add(NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")))) + var mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C"))) + var mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D"))) + registeredNodes[mockNodeA.identity] = mockNodeA + registeredNodes[mockNodeB.identity] = mockNodeB + } + + /** + * Directly add a registration to the internal cache. DOES NOT fire the change listeners, as it's + * not a change being received. + */ + @VisibleForTesting + fun addRegistration(node: NodeInfo) { + registeredNodes[node.identity] = node + } + + /** + * Directly remove a registration from the internal cache. DOES NOT fire the change listeners, as it's + * not a change being received. + */ + @VisibleForTesting + fun deleteRegistration(identity: Party) : Boolean { + return registeredNodes.remove(identity) != null } } \ No newline at end of file diff --git a/src/main/kotlin/core/testing/Simulation.kt b/src/main/kotlin/core/testing/Simulation.kt index f4931d93e0..62eeb0042c 100644 --- a/src/main/kotlin/core/testing/Simulation.kt +++ b/src/main/kotlin/core/testing/Simulation.kt @@ -114,8 +114,11 @@ abstract class Simulation(val runAsync: Boolean, // Now wire up the network maps for each node. // TODO: This is obviously bogus: there should be a single network map for the whole simulated network. for (node in regulators + serviceProviders + banks) { - (node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes += ratesOracle.info - (node.services.networkMapCache as MockNetworkMapCache).regulators += regulators.map { it.info } + val cache = (node.services.networkMapCache as MockNetworkMapCache) + cache.addRegistration(ratesOracle.info) + regulators.forEach { regulator -> + cache.addRegistration(regulator.info) + } } } diff --git a/src/main/kotlin/core/utilities/AddOrRemove.kt b/src/main/kotlin/core/utilities/AddOrRemove.kt new file mode 100644 index 0000000000..44dea24592 --- /dev/null +++ b/src/main/kotlin/core/utilities/AddOrRemove.kt @@ -0,0 +1,9 @@ +package core.utilities + +/** + * Enum for when adding/removing something, for example adding or removing an entry in a directory. + */ +enum class AddOrRemove { + ADD, + REMOVE +} \ No newline at end of file diff --git a/src/main/kotlin/demos/IRSDemo.kt b/src/main/kotlin/demos/IRSDemo.kt index 3d581b9b6f..b5f8005e62 100644 --- a/src/main/kotlin/demos/IRSDemo.kt +++ b/src/main/kotlin/demos/IRSDemo.kt @@ -94,10 +94,10 @@ fun main(args: Array) { val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, DemoClock()).start() } // Add self to network map - (node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(node.info) + (node.services.networkMapCache as MockNetworkMapCache).addRegistration(node.info) - // Add rates oracle to network map - (node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes.add(rateOracleId) + // Add rates oracle to network map if one has been specified + rateOracleId?.let { (node.services.networkMapCache as MockNetworkMapCache).addRegistration(it) } val hostAndPortStrings = options.valuesOf(fakeTradeWithAddr) val identityFiles = options.valuesOf(fakeTradeWithIdentityFile) @@ -107,7 +107,7 @@ fun main(args: Array) { for ((hostAndPortString, identityFile) in hostAndPortStrings.zip(identityFiles)) { try { val peerId = nodeInfo(hostAndPortString, identityFile) - (node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(peerId) + (node.services.networkMapCache as MockNetworkMapCache).addRegistration(peerId) node.services.identityService.registerIdentity(peerId.identity) } catch (e: Exception) { }