Add InMemoryNetworkMapCache

This commit is contained in:
Ross Nicoll 2016-04-27 14:01:12 +01:00
parent 0575bcc959
commit 44054c47f8
8 changed files with 149 additions and 24 deletions

View File

@ -98,10 +98,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
timestamperAddress
} else {
info.advertisedServices += TimestamperService.Type
inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock)
NodeInfo(net.myAddress, storage.myLegalIdentity, setOf(TimestamperService.Type))
}
(services.networkMapCache as MockNetworkMapCache).timestampingNodes.add(tsid)
(services.networkMapCache as MockNetworkMapCache).addRegistration(tsid)
}
lateinit var interestRatesService: NodeInterestRates.Service

View File

@ -1,21 +1,101 @@
package core.node.services
import core.Contract
import core.Party
import core.node.NodeInfo
import core.utilities.AddOrRemove
import org.slf4j.LoggerFactory
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* A network map contains lists of nodes on the network along with information about their identity keys, services
* they provide and host names or IP addresses where they can be connected to. A reasonable architecture for the
* network map service might be one like the Tor directory authorities, where several nodes linked by RAFT or Paxos
* elect a leader and that leader distributes signed documents describing the network layout. Those documents can
* then be cached by every node and thus a network map can be retrieved given only a single successful peer connection.
*
* This interface assumes fast, synchronous access to an in-memory map.
* they provide and host names or IP addresses where they can be connected to. The cache wraps around a map fetched
* from an authoritative service, and adds easy lookup of the data stored within it. Generally it would be initialised
* with a specified network map service, which it fetches data from and then subscribes to updates of.
*/
interface NetworkMapCache {
companion object {
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
}
/** A list of nodes that advertise a timestamping service */
val timestampingNodes: List<NodeInfo>
/** A list of nodes that advertise a rates oracle service */
val ratesOracleNodes: List<NodeInfo>
/** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo>
/** A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outwith
* the scope of the network map service, and this is intended solely as a sanity check on configuration stored
* elsewhere.
*/
val regulators: List<NodeInfo>
/**
* Look up the node info for a party.
*/
fun nodeForPartyName(name: String): NodeInfo? = partyNodes.singleOrNull { it.identity.name == name }
/**
* Get a copy of all nodes in the map.
*/
fun get(): Collection<NodeInfo>
/**
* Get the collection of nodes which advertise a specific service.
*/
fun get(serviceType: ServiceType): Collection<NodeInfo>
/**
* Get a recommended node that advertises a service, and is suitable for the specified contract and parties.
* Implementations might understand, for example, the correct regulator to use for specific contracts/parties,
* or the appropriate oracle for a contract.
*/
fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo?
/**
* Adds a node to the local cache (generally only used for adding ourselves)
*/
fun addNode(node: NodeInfo)
/**
* Removes a node from the local cache
*/
fun removeNode(node: NodeInfo)
}
/**
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : NetworkMapCache {
override val regulators: List<NodeInfo>
get() = get(RegulatorService.Type)
override val timestampingNodes: List<NodeInfo>
get() = get(TimestamperService.Type)
override val ratesOracleNodes: List<NodeInfo>
get() = get(NodeInterestRates.Type)
override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value }
private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node
}
override fun removeNode(node: NodeInfo) {
registeredNodes.remove(node.identity)
}
}
sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */
class DeregistrationFailed : NetworkCacheError()
}

View File

@ -0,0 +1,9 @@
package core.node.services
/**
* Placeholder interface for regulator services.
*/
interface RegulatorService {
object Type : ServiceType("corda.regulator")
}

View File

@ -12,6 +12,11 @@ import java.security.PublicKey
import java.time.Clock
import java.util.*
/**
* Postfix for base topics when sending a request to a service.
*/
val TOPIC_DEFAULT_POSTFIX = ".0"
/**
* This file defines various 'services' which are not currently fleshed out. A service is a module that provides
* immutable snapshots of data that may be changing in response to user or network events.

View File

@ -7,23 +7,41 @@
*/
package core.testing
import co.paralleluniverse.common.util.VisibleForTesting
import core.Party
import core.crypto.DummyPublicKey
import core.messaging.SingleMessageRecipient
import core.node.services.NetworkMapCache
import core.node.services.InMemoryNetworkMapCache
import core.node.NodeInfo
import java.util.*
class MockNetworkMapCache : NetworkMapCache {
data class MockAddress(val id: String) : SingleMessageRecipient
override val timestampingNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val ratesOracleNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val partyNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val regulators = Collections.synchronizedList(ArrayList<NodeInfo>())
/**
* Network map cache with no backing map service.
*/
class MockNetworkMapCache() : InMemoryNetworkMapCache() {
data class MockAddress(val id: String): SingleMessageRecipient
init {
partyNodes.add(NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C"))))
partyNodes.add(NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D"))))
var mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")))
var mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")))
registeredNodes[mockNodeA.identity] = mockNodeA
registeredNodes[mockNodeB.identity] = mockNodeB
}
/**
* Directly add a registration to the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun addRegistration(node: NodeInfo) {
registeredNodes[node.identity] = node
}
/**
* Directly remove a registration from the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun deleteRegistration(identity: Party) : Boolean {
return registeredNodes.remove(identity) != null
}
}

View File

@ -114,8 +114,11 @@ abstract class Simulation(val runAsync: Boolean,
// Now wire up the network maps for each node.
// TODO: This is obviously bogus: there should be a single network map for the whole simulated network.
for (node in regulators + serviceProviders + banks) {
(node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes += ratesOracle.info
(node.services.networkMapCache as MockNetworkMapCache).regulators += regulators.map { it.info }
val cache = (node.services.networkMapCache as MockNetworkMapCache)
cache.addRegistration(ratesOracle.info)
regulators.forEach { regulator ->
cache.addRegistration(regulator.info)
}
}
}

View File

@ -0,0 +1,9 @@
package core.utilities
/**
* Enum for when adding/removing something, for example adding or removing an entry in a directory.
*/
enum class AddOrRemove {
ADD,
REMOVE
}

View File

@ -94,10 +94,10 @@ fun main(args: Array<String>) {
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, DemoClock()).start() }
// Add self to network map
(node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(node.info)
(node.services.networkMapCache as MockNetworkMapCache).addRegistration(node.info)
// Add rates oracle to network map
(node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes.add(rateOracleId)
// Add rates oracle to network map if one has been specified
rateOracleId?.let { (node.services.networkMapCache as MockNetworkMapCache).addRegistration(it) }
val hostAndPortStrings = options.valuesOf(fakeTradeWithAddr)
val identityFiles = options.valuesOf(fakeTradeWithIdentityFile)
@ -107,7 +107,7 @@ fun main(args: Array<String>) {
for ((hostAndPortString, identityFile) in hostAndPortStrings.zip(identityFiles)) {
try {
val peerId = nodeInfo(hostAndPortString, identityFile)
(node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(peerId)
(node.services.networkMapCache as MockNetworkMapCache).addRegistration(peerId)
node.services.identityService.registerIdentity(peerId.identity)
} catch (e: Exception) {
}