Merged in rnicoll-network-map-service (pull request #57)

Add NetworkMapService
This commit is contained in:
Ross Nicoll 2016-04-27 16:41:17 +01:00
commit 821efd91bb
22 changed files with 1034 additions and 161 deletions

View File

@ -20,7 +20,9 @@ if [[ "$mode" == "nodeA" ]]; then
RC=83
while [ $RC -eq 83 ]
do
build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost --fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public
build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost \
--fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public \
--network-map-identity-file=nodeA/identity-public --network-map-address=localhost
RC=$?
done
elif [[ "$mode" == "nodeB" ]]; then
@ -35,7 +37,9 @@ elif [[ "$mode" == "nodeB" ]]; then
RC=83
while [ $RC -eq 83 ]
do
build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 --fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public &
build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 \
--fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public \
-network-map-identity-file=nodeA/identity-public --network-map-address=localhost &
while ! curl -F rates=@scripts/example.rates.txt http://localhost:31341/upload/interest-rates; do
echo "Retry to upload interest rates to oracle after 5 seconds"
sleep 5

View File

@ -3,14 +3,19 @@ package core.node
import api.APIServer
import api.APIServerImpl
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.Party
import core.crypto.generateKeyPair
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.node.services.*
import core.random63BitValue
import core.serialization.deserialize
import core.serialization.serialize
import core.testing.MockNetworkMapCache
import core.utilities.AddOrRemove
import core.utilities.AffinityExecutor
import org.slf4j.Logger
import java.nio.file.FileAlreadyExistsException
@ -18,18 +23,32 @@ import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
/**
* A base node implementation that can be customised either for production (with real implementations that do real
* I/O), or a mock implementation suitable for unit test environments.
*/
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val timestamperAddress: NodeInfo?, val platformClock: Clock) {
// TODO: Where this node is the initial network map service, currently no initialNetworkMapAddress is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val initialNetworkMapAddress: NodeInfo?,
val advertisedServices: Set<ServiceType>, val platformClock: Clock) {
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
}
val networkMapServiceCallTimeout: Duration = Duration.ofSeconds(1)
// TODO: Persist this, as well as whether the node is registered.
/**
* Sequence number of changes sent to the network map service, when registering/de-registering this node
*/
var networkMapSeq: Long = 1
protected abstract val log: Logger
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
@ -43,7 +62,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val services = object : ServiceHub {
override val networkService: MessagingService get() = net
override val networkMapCache: NetworkMapCache = MockNetworkMapCache()
override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache()
override val storageService: StorageService get() = storage
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
@ -53,7 +72,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
val info: NodeInfo by lazy {
NodeInfo(net.myAddress, storage.myLegalIdentity, emptySet(), findMyLocation())
NodeInfo(net.myAddress, storage.myLegalIdentity, advertisedServices, findMyLocation())
}
protected open fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
@ -62,6 +81,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var smm: StateMachineManager
lateinit var wallet: WalletService
lateinit var keyManagement: E2ETestKeyManagementService
var inNodeNetworkMapService: NetworkMapService? = null
var inNodeTimestampingService: NodeTimestamperService? = null
lateinit var identity: IdentityService
lateinit var net: MessagingService
@ -77,31 +97,69 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
keyManagement = E2ETestKeyManagementService()
makeInterestRatesOracleService()
api = APIServerImpl(this)
makeTimestampingService(timestamperAddress)
// Build services we're advertising
if (NetworkMapService.Type in info.advertisedServices) makeNetworkMapService()
if (TimestamperService.Type in info.advertisedServices) makeTimestampingService()
identity = makeIdentityService()
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
startMessagingService()
require(initialNetworkMapAddress == null || NetworkMapService.Type in initialNetworkMapAddress.advertisedServices)
{ "Initial network map address must indicate a node that provides a network map service" }
configureNetworkMapCache(initialNetworkMapAddress)
return this
}
private fun makeTimestampingService(timestamperAddress: NodeInfo?) {
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
val tsid = if (timestamperAddress != null) {
inNodeTimestampingService = null
require(TimestamperService.Type in timestamperAddress.advertisedServices) {
"Timestamper address must indicate a node that provides timestamping services, actually " +
"has ${timestamperAddress.advertisedServices}"
}
timestamperAddress
} else {
inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock)
NodeInfo(net.myAddress, storage.myLegalIdentity, setOf(TimestamperService.Type))
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
private fun configureNetworkMapCache(networkMapAddress: NodeInfo?) {
services.networkMapCache.addNode(info)
if (initialNetworkMapAddress != null) {
// TODO: Return a future so the caller knows these operations may not have completed yet, and can monitor
// if needed
updateRegistration(initialNetworkMapAddress, AddOrRemove.ADD)
services.networkMapCache.addMapService(this.smm, net, initialNetworkMapAddress, true, null)
}
(services.networkMapCache as MockNetworkMapCache).timestampingNodes.add(tsid)
if (inNodeNetworkMapService != null) {
// Register for updates
services.networkMapCache.addMapService(this.smm, net, info, true, null)
}
}
private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture<NetworkMapService.RegistrationResponse> {
// Register this node against the network
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
val message = net.createMessage(NetworkMapService.REGISTER_PROTOCOL_TOPIC + ".0", request.serialize().bits)
val future = SettableFuture.create<NetworkMapService.RegistrationResponse>()
val topic = NetworkMapService.REGISTER_PROTOCOL_TOPIC + "." + sessionID
net.runOnNextMessage(topic, MoreExecutors.directExecutor()) { message ->
future.set(message.data.deserialize())
}
net.send(message, serviceInfo.address)
return future
}
open protected fun makeNetworkMapService() {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires)
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
}
open protected fun makeTimestampingService() {
inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock)
}
lateinit var interestRatesService: NodeInterestRates.Service
@ -113,26 +171,32 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
protected open fun makeIdentityService(): IdentityService {
// We don't have any identity infrastructure right now, so we just throw together the only identities we
// know about: our own, the identity of the remote timestamper node (if any), plus whatever is in the
// network map.
//
val service = InMemoryIdentityService()
if (timestamperAddress != null)
service.registerIdentity(timestamperAddress.identity)
if (initialNetworkMapAddress != null)
service.registerIdentity(initialNetworkMapAddress.identity)
service.registerIdentity(storage.myLegalIdentity)
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
// TODO: Subscribe to updates to the network map cache
return service
}
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
// to indicate "Please shut down gracefully" vs "Shut down now".
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process.
net.stop()
}
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun startMessagingService()
protected open fun initialiseStorageService(dir: Path): StorageService {
val attachments = makeAttachmentStorage(dir)
_servicesThatAcceptUploads += attachments
@ -184,4 +248,3 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics)
}
}

View File

@ -7,6 +7,7 @@ import com.codahale.metrics.JmxReporter
import com.google.common.net.HostAndPort
import core.messaging.MessagingService
import core.node.services.ArtemisMessagingService
import core.node.services.ServiceType
import core.node.servlets.AttachmentDownloadServlet
import core.node.servlets.DataUploadServlet
import core.utilities.AffinityExecutor
@ -40,12 +41,15 @@ class ConfigurationException(message: String) : Exception(message)
* @param p2pAddr The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param configuration This is typically loaded from a .properties file
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
* @param networkMapAddress An external network map service to use. Should only ever be null when creating the first
* network map service, while bootstrapping a network.
* @param advertisedServices The services this node advertises. This must be a subset of the services it runs,
* but nodes are not required to advertise services they run (hence subset).
* @param clock The clock used within the node and by all protocols etc
*/
class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration,
timestamperAddress: NodeInfo?,
clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, timestamperAddress, clock) {
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
val DEFAULT_PORT = 31337
@ -63,6 +67,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, serverThread)
override fun startMessagingService() {
// Start up the MQ service.
(net as ArtemisMessagingService).start()
}
private fun initWebServer(): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
@ -114,8 +123,6 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
alreadyRunningNodeCheck()
super.start()
webServer = initWebServer()
// Start up the MQ service.
(net as ArtemisMessagingService).start()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).

View File

@ -12,28 +12,46 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
abstract class AbstractNodeService(val net: MessagingService) {
/**
* Postfix for base topics when sending a request to a service.
*/
protected val topicDefaultPostfix = ".0"
/**
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
* common boilerplate code.
* common boilerplate code. Exceptions are caught and passed to the provided consumer.
*
* @param topic the topic, without the default session ID postfix (".0)
* @param handler a function to handle the deserialised request and return a response
* @param exceptionConsumer a function to which any thrown exception is passed.
*/
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
addMessageHandler(topic: String,
crossinline handler: (Q) -> R,
crossinline exceptionHandler: (Message, Exception) -> Unit) {
net.addMessageHandler(topic + topicDefaultPostfix, null) { message, r ->
crossinline exceptionConsumer: (Message, Exception) -> Unit) {
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
try {
val req = message.data.deserialize<Q>()
val data = handler(req)
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
net.send(msg, req.replyTo)
} catch(e: Exception) {
exceptionHandler(message, e)
exceptionConsumer(message, e)
}
}
}
}
/**
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
* common boilerplate code. Exceptions are propagated to the messaging layer.
*
* @param topic the topic, without the default session ID postfix (".0)
* @param handler a function to handle the deserialised request and return a response
*/
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
addMessageHandler(topic: String,
crossinline handler: (Q) -> R) {
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
val req = message.data.deserialize<Q>()
val data = handler(req)
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
net.send(msg, req.replyTo)
}
}
}

View File

@ -1,21 +1,222 @@
package core.node.services
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.Contract
import core.Party
import core.crypto.SecureHash
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.node.NodeInfo
import core.random63BitValue
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.AddOrRemove
import org.slf4j.LoggerFactory
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* A network map contains lists of nodes on the network along with information about their identity keys, services
* they provide and host names or IP addresses where they can be connected to. A reasonable architecture for the
* network map service might be one like the Tor directory authorities, where several nodes linked by RAFT or Paxos
* elect a leader and that leader distributes signed documents describing the network layout. Those documents can
* then be cached by every node and thus a network map can be retrieved given only a single successful peer connection.
*
* This interface assumes fast, synchronous access to an in-memory map.
* they provide and host names or IP addresses where they can be connected to. The cache wraps around a map fetched
* from an authoritative service, and adds easy lookup of the data stored within it. Generally it would be initialised
* with a specified network map service, which it fetches data from and then subscribes to updates of.
*/
interface NetworkMapCache {
companion object {
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
}
/** A list of nodes that advertise a network map service */
val networkMapNodes: List<NodeInfo>
/** A list of nodes that advertise a timestamping service */
val timestampingNodes: List<NodeInfo>
/** A list of nodes that advertise a rates oracle service */
val ratesOracleNodes: List<NodeInfo>
/** A list of all nodes the cache is aware of */
val partyNodes: List<NodeInfo>
/** A list of nodes that advertise a regulatory service. Identifying the correct regulator for a trade is outwith
* the scope of the network map service, and this is intended solely as a sanity check on configuration stored
* elsewhere.
*/
val regulators: List<NodeInfo>
/**
* Look up the node info for a party.
*/
fun nodeForPartyName(name: String): NodeInfo? = partyNodes.singleOrNull { it.identity.name == name }
/**
* Get a copy of all nodes in the map.
*/
fun get(): Collection<NodeInfo>
/**
* Get the collection of nodes which advertise a specific service.
*/
fun get(serviceType: ServiceType): Collection<NodeInfo>
/**
* Get a recommended node that advertises a service, and is suitable for the specified contract and parties.
* Implementations might understand, for example, the correct regulator to use for specific contracts/parties,
* or the appropriate oracle for a contract.
*/
fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo?
/**
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
*
* @param smm state machine manager to use when requesting
* @param net the network messaging service
* @param service the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
*/
fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
/**
* Adds a node to the local cache (generally only used for adding ourselves)
*/
fun addNode(node: NodeInfo)
/**
* Removes a node from the local cache
*/
fun removeNode(node: NodeInfo)
/**
* Deregister from updates from the given map service.
*
* @param smm state machine manager to use when requesting
* @param net the network messaging service
* @param service the network map service to fetch current state from.
*/
fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
}
/**
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>
get() = get(RegulatorService.Type)
override val timestampingNodes: List<NodeInfo>
get() = get(TimestamperService.Type)
override val ratesOracleNodes: List<NodeInfo>
get() = get(NodeInterestRates.Type)
override val partyNodes: List<NodeInfo>
get() = registeredNodes.map { it.value }
private var registeredForPush = false
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX,
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {
NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) {
NetworkMapCache.logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
future.set(Unit)
}
net.send(net.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address)
return future
}
override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node
}
override fun removeNode(node: NodeInfo) {
registeredNodes.remove(node.identity)
}
/**
* Unsubscribes from updates from the given map service.
*
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
if (resp.confirmed) {
future.set(Unit)
} else {
future.setException(NetworkCacheError.DeregistrationFailed())
}
}
net.send(net.createMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address)
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
val reg: NodeRegistration
try {
reg = req.wireReg.verified()
} catch(e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
processRegistration(reg)
}
private fun processRegistration(reg: NodeRegistration) {
// TODO: Implement filtering by sequence number, so we only accept changes that are
// more recent than the latest change we've processed.
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
}
sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */
class DeregistrationFailed : NetworkCacheError()
}

View File

@ -0,0 +1,301 @@
package core.node.services
import co.paralleluniverse.common.util.VisibleForTesting
import core.Party
import core.ThreadBox
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.SignedData
import core.crypto.signWithECDSA
import core.messaging.MessageRecipients
import core.messaging.MessagingService
import core.messaging.SingleMessageRecipient
import core.node.NodeInfo
import core.serialization.SerializedBytes
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.AddOrRemove
import org.slf4j.LoggerFactory
import protocols.*
import java.security.PrivateKey
import java.time.Period
import java.time.Instant
import java.util.ArrayList
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
/**
* A network map contains lists of nodes on the network along with information about their identity keys, services
* they provide and host names or IP addresses where they can be connected to. This information is cached locally within
* nodes, by the [NetworkMapCache]. Currently very basic consensus controls are applied, using signed changes which
* replace each other based on a serial number present in the change.
*/
// TODO: A better architecture for the network map service might be one like the Tor directory authorities, where
// several nodes linked by RAFT or Paxos elect a leader and that leader distributes signed documents describing the
// network layout. Those documents can then be cached by every node and thus a network map can/ be retrieved given only
// a single successful peer connection.
//
// It may also be that this is replaced or merged with the identity management service; for example if the network has
// a concept of identity changes over time, should that include the node for an identity? If so, that is likely to
// replace this service.
interface NetworkMapService {
object Type : ServiceType("corda.network_map")
companion object {
val DEFAULT_EXPIRATION_PERIOD = Period.ofWeeks(4)
val FETCH_PROTOCOL_TOPIC = "platform.network_map.fetch"
val QUERY_PROTOCOL_TOPIC = "platform.network_map.query"
val REGISTER_PROTOCOL_TOPIC = "platform.network_map.register"
val SUBSCRIPTION_PROTOCOL_TOPIC = "platform.network_map.subscribe"
// Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache.
// When subscribing to these updates, remember they must be acknowledged
val PUSH_PROTOCOL_TOPIC = "platform.network_map.push"
// Base topic for messages acknowledging pushed updates
val PUSH_ACK_PROTOCOL_TOPIC = "platform.network_map.push_ack"
val logger = LoggerFactory.getLogger(NetworkMapService::class.java)
}
val nodes: List<NodeInfo>
class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
data class QueryIdentityResponse(val node: NodeInfo?)
class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
data class RegistrationResponse(val success: Boolean)
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
data class SubscribeResponse(val confirmed: Boolean)
data class Update(val wireReg: WireNodeRegistration, val replyTo: MessageRecipients)
data class UpdateAcknowledge(val wireRegHash: SecureHash, val replyTo: MessageRecipients)
}
@ThreadSafe
class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net) {
private val registeredNodes = ConcurrentHashMap<Party, NodeRegistration>()
// Map from subscriber address, to a list of unacknowledged updates
private val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, MutableList<SecureHash>>())
private val mapVersion = AtomicInteger(1)
/** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */
val maxUnacknowledgedUpdates = 10
/**
* Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a
* 10 times overhead.
*/
val maxSizeRegistrationRequestBytes = 5500
// Filter reduces this to the entries that add a node to the map
override val nodes: List<NodeInfo>
get() = registeredNodes.mapNotNull { if (it.value.type == AddOrRemove.ADD) it.value.node else null }
init {
// Register the local node with the service
val homeIdentity = home.node.identity
registeredNodes[homeIdentity] = home
// Register message handlers
addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC,
{ req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) }
)
addMessageHandler(NetworkMapService.QUERY_PROTOCOL_TOPIC,
{ req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) }
)
addMessageHandler(NetworkMapService.REGISTER_PROTOCOL_TOPIC,
{ req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) }
)
addMessageHandler(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
{ req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) }
)
net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
processAcknowledge(req)
}
}
private fun addSubscriber(subscriber: MessageRecipients) {
if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked {
if (!containsKey(subscriber)) {
put(subscriber, mutableListOf<SecureHash>())
}
}
}
private fun removeSubscriber(subscriber: MessageRecipients) {
if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked { remove(subscriber) }
}
@VisibleForTesting
fun getUnacknowledgedCount(subscriber: SingleMessageRecipient): Int?
= subscribers.locked { get(subscriber)?.count() }
@VisibleForTesting
fun notifySubscribers(wireReg: WireNodeRegistration) {
// TODO: Once we have a better established messaging system, we can probably send
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, net.myAddress).serialize().bits
val topic = NetworkMapService.PUSH_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX
val message = net.createMessage(topic, update)
subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>()
val hash = SecureHash.sha256(wireReg.raw.bits)
forEach { subscriber: Map.Entry<SingleMessageRecipient, MutableList<SecureHash>> ->
val unacknowledged = subscriber.value
if (unacknowledged.count() < maxUnacknowledgedUpdates) {
unacknowledged.add(hash)
net.send(message, subscriber.key)
} else {
toRemove.add(subscriber.key)
}
}
toRemove.forEach { remove(it) }
}
}
@VisibleForTesting
fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit {
subscribers.locked {
this[req.replyTo]?.remove(req.wireRegHash)
}
}
@VisibleForTesting
fun processFetchAllRequest(req: NetworkMapService.FetchMapRequest): NetworkMapService.FetchMapResponse {
if (req.subscribe) {
addSubscriber(req.replyTo)
}
val ver = mapVersion.get()
if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) {
val nodes = ArrayList(registeredNodes.values) // Snapshot to avoid attempting to serialise ConcurrentHashMap internals
return NetworkMapService.FetchMapResponse(nodes, ver)
} else {
return NetworkMapService.FetchMapResponse(null, ver)
}
}
@VisibleForTesting
fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse {
val candidate = registeredNodes[req.identity]
// If the most recent record we have is of the node being removed from the map, then it's considered
// as no match.
if (candidate == null || candidate.type == AddOrRemove.REMOVE) {
return NetworkMapService.QueryIdentityResponse(null)
} else {
return NetworkMapService.QueryIdentityResponse(candidate.node)
}
}
@VisibleForTesting
fun processRegistrationChangeRequest(req: NetworkMapService.RegistrationRequest): NetworkMapService.RegistrationResponse {
require(req.wireReg.raw.size < maxSizeRegistrationRequestBytes)
val change: NodeRegistration
try {
change = req.wireReg.verified()
} catch(e: java.security.SignatureException) {
throw NodeMapError.InvalidSignature()
}
val node = change.node
var changed: Boolean = false
// Update the current value atomically, so that if multiple updates come
// in on different threads, there is no risk of a race condition while checking
// sequence numbers.
registeredNodes.compute(node.identity, { mapKey: Party, existing: NodeRegistration? ->
changed = existing == null || existing.serial < change.serial
if (changed) {
when (change.type) {
AddOrRemove.ADD -> change
AddOrRemove.REMOVE -> change
else -> throw NodeMapError.UnknownChangeType()
}
} else {
existing
}
})
if (changed) {
notifySubscribers(req.wireReg)
// Update the local cache
// TODO: Once local messaging is fixed, this should go over the network layer as it does to other
// subscribers
when (change.type) {
AddOrRemove.ADD -> {
NetworkMapService.logger.info("Added node ${node.address} to network map")
cache.addNode(change.node)
}
AddOrRemove.REMOVE -> {
NetworkMapService.logger.info("Removed node ${node.address} from network map")
cache.removeNode(change.node)
}
}
mapVersion.incrementAndGet()
}
return NetworkMapService.RegistrationResponse(changed)
}
@VisibleForTesting
fun processSubscriptionRequest(req: NetworkMapService.SubscribeRequest): NetworkMapService.SubscribeResponse {
when (req.subscribe) {
false -> removeSubscriber(req.replyTo)
true -> addSubscriber(req.replyTo)
}
return NetworkMapService.SubscribeResponse(true)
}
}
/**
* A node registration state in the network map.
*
* @param node the node being added/removed.
* @param serial an increasing value which represents the version of this registration. Not expected to be sequential,
* but later versions of the registration must have higher values (or they will be ignored by the map service).
* Similar to the serial number on DNS records.
* @param type add if the node is being added to the map, or remove if a previous node is being removed (indicated as
* going offline).
* @param expires when the registration expires. Only used when adding a node to a map.
*/
// TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node
// involves providing both node and paerty, and deregistering a node involves a request with party but no node.
class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
/**
* Build a node registration in wire format.
*/
fun toWire(privateKey: PrivateKey): WireNodeRegistration {
val regSerialized = this.serialize()
val regSig = privateKey.signWithECDSA(regSerialized.bits, node.identity.owningKey)
return WireNodeRegistration(regSerialized, regSig)
}
override fun toString() : String = "$node #${serial} (${type})"
}
/**
* A node registration and its signature as a pair.
*/
class WireNodeRegistration(raw: SerializedBytes<NodeRegistration>, sig: DigitalSignature.WithKey) : SignedData<NodeRegistration>(raw, sig) {
@Throws(IllegalArgumentException::class)
override fun verifyData(reg: NodeRegistration) {
require(reg.node.identity.owningKey == sig.by)
}
}
sealed class NodeMapError : Exception() {
/** Thrown if the signature on the node info does not match the public key for the identity */
class InvalidSignature : NodeMapError()
/** Thrown if the replyTo of a subscription change message is not a single message recipient */
class InvalidSubscriber : NodeMapError()
/** Thrown if a change arrives which is of an unknown type */
class UnknownChangeType : NodeMapError()
}

View File

@ -0,0 +1,9 @@
package core.node.services
/**
* Placeholder interface for regulator services.
*/
interface RegulatorService {
object Type : ServiceType("corda.regulator")
}

View File

@ -12,6 +12,11 @@ import java.security.PublicKey
import java.time.Clock
import java.util.*
/**
* Postfix for base topics when sending a request to a service.
*/
val TOPIC_DEFAULT_POSTFIX = ".0"
/**
* This file defines various 'services' which are not currently fleshed out. A service is a module that provides
* immutable snapshots of data that may be changing in response to user or network events.

View File

@ -7,23 +7,41 @@
*/
package core.testing
import co.paralleluniverse.common.util.VisibleForTesting
import core.Party
import core.crypto.DummyPublicKey
import core.messaging.SingleMessageRecipient
import core.node.services.NetworkMapCache
import core.node.services.InMemoryNetworkMapCache
import core.node.NodeInfo
import java.util.*
class MockNetworkMapCache : NetworkMapCache {
data class MockAddress(val id: String) : SingleMessageRecipient
override val timestampingNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val ratesOracleNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val partyNodes = Collections.synchronizedList(ArrayList<NodeInfo>())
override val regulators = Collections.synchronizedList(ArrayList<NodeInfo>())
/**
* Network map cache with no backing map service.
*/
class MockNetworkMapCache() : InMemoryNetworkMapCache() {
data class MockAddress(val id: String): SingleMessageRecipient
init {
partyNodes.add(NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C"))))
partyNodes.add(NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D"))))
var mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")))
var mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")))
registeredNodes[mockNodeA.identity] = mockNodeA
registeredNodes[mockNodeB.identity] = mockNodeB
}
/**
* Directly add a registration to the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun addRegistration(node: NodeInfo) {
registeredNodes[node.identity] = node
}
/**
* Directly remove a registration from the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun deleteRegistration(identity: Party) : Boolean {
return registeredNodes.remove(identity) != null
}
}

View File

@ -1,6 +1,8 @@
package core.testing
import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import core.Party
import core.messaging.MessagingService
import core.messaging.SingleMessageRecipient
@ -8,7 +10,7 @@ import core.node.AbstractNode
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.PhysicalLocation
import core.testing.MockIdentityService
import core.node.services.NetworkMapService
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.utilities.AffinityExecutor
@ -46,18 +48,18 @@ class MockNetwork(private val threadPerNode: Boolean = false,
/** Allows customisation of how nodes are created. */
interface Factory {
fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
timestamperAddr: NodeInfo?, id: Int): MockNode
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode
}
object DefaultFactory : Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
timestamperAddr: NodeInfo?, id: Int): MockNode {
return MockNode(dir, config, network, timestamperAddr, id)
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode {
return MockNode(dir, config, network, networkMapAddr, advertisedServices, id)
}
}
open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork,
withTimestamper: NodeInfo?, val id: Int) : AbstractNode(dir, config, withTimestamper, Clock.systemUTC()) {
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, val id: Int) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) {
override val log: Logger = loggerFor<MockNode>()
override val serverThread: AffinityExecutor =
if (mockNet.threadPerNode)
@ -75,6 +77,10 @@ class MockNetwork(private val threadPerNode: Boolean = false,
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
override fun startMessagingService() {
// Nothing to do
}
// There is no need to slow down the unit tests by initialising CityDatabase
override fun findMyLocation(): PhysicalLocation? = null
@ -88,8 +94,8 @@ class MockNetwork(private val threadPerNode: Boolean = false,
}
/** Returns a started node, optionally created by the passed factory method */
fun createNode(withTimestamper: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
advertisedServices: Set<ServiceType> = emptySet()): MockNode {
fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
vararg advertisedServices: ServiceType): MockNode {
val newNode = forcedID == -1
val id = if (newNode) counter++ else forcedID
@ -101,8 +107,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
override val exportJMXto: String = ""
override val nearestCity: String = "Atlantis"
}
val node = nodeFactory.create(path, config, this, withTimestamper, id).start()
node.info.advertisedServices = advertisedServices
val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id).start()
_nodes.add(node)
return node
}
@ -123,12 +128,13 @@ class MockNetwork(private val threadPerNode: Boolean = false,
}
/**
* Sets up a two node network in which the first node runs a timestamping service and the other doesn't.
* Sets up a two node network, in which the first node runs network map and timestamping services and the other
* doesn't.
*/
fun createTwoNodes(nodeFactory: Factory = defaultFactory): Pair<MockNode, MockNode> {
require(nodes.isEmpty())
return Pair(
createNode(null, -1, nodeFactory, setOf(TimestamperService.Type)),
createNode(null, -1, nodeFactory, NetworkMapService.Type, TimestamperService.Type),
createNode(nodes[0].info, -1, nodeFactory)
)
}

View File

@ -5,6 +5,10 @@ import core.node.CityDatabase
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.PhysicalLocation
import core.node.services.NetworkMapService
import core.node.services.NodeInterestRates
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.protocols.ProtocolLogic
import core.then
import core.utilities.ProgressTracker
@ -32,14 +36,15 @@ abstract class Simulation(val runAsync: Boolean,
// This puts together a mock network of SimulatedNodes.
open class SimulatedNode(dir: Path, config: NodeConfiguration, mockNet: MockNetwork,
withTimestamper: NodeInfo?, id: Int) : MockNetwork.MockNode(dir, config, mockNet, withTimestamper, id) {
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int) : MockNetwork.MockNode(dir, config, mockNet, networkMapAddress, advertisedServices, id) {
override fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
}
inner class BankFactory : MockNetwork.Factory {
var counter = 0
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
val letter = 'A' + counter
val city = bankLocations[counter++ % bankLocations.size]
val cfg = object : NodeConfiguration {
@ -48,52 +53,71 @@ abstract class Simulation(val runAsync: Boolean,
override val exportJMXto: String = ""
override val nearestCity: String = city
}
return SimulatedNode(dir, cfg, network, timestamperAddr, id)
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id)
}
fun createAll(): List<SimulatedNode> = bankLocations.map { network.createNode(timestamper.info, nodeFactory = this) as SimulatedNode }
fun createAll(): List<SimulatedNode> = bankLocations.
map { network.createNode(networkMap.info, nodeFactory = this) as SimulatedNode }
}
val bankFactory = BankFactory()
object NetworkMapNodeFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(NetworkMapService.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Network Map Service Provider"
override val exportJMXto: String = ""
override val nearestCity: String = "Madrid"
}
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) { }
}
}
object TimestampingNodeFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(TimestamperService.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Timestamping Service" // A magic string recognised by the CP contract
override val exportJMXto: String = ""
override val nearestCity: String = "Zurich"
}
return SimulatedNode(dir, cfg, network, timestamperAddr, id)
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id)
}
}
object RatesOracleFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(NodeInterestRates.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Rates Service Provider"
override val exportJMXto: String = ""
override val nearestCity: String = "Madrid"
}
val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, id) {
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) {
override fun makeInterestRatesOracleService() {
super.makeInterestRatesOracleService()
interestRatesService.upload(javaClass.getResourceAsStream("example.rates.txt"))
}
}
return n
}
}
object RegulatorFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Regulator A"
override val exportJMXto: String = ""
override val nearestCity: String = "Paris"
}
val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, id) {
val n = object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) {
// TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request.
// So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it.
// But that's fine for visualisation purposes.
@ -105,17 +129,19 @@ abstract class Simulation(val runAsync: Boolean,
val network = MockNetwork(false)
val regulators: List<SimulatedNode> = listOf(network.createNode(null, nodeFactory = RegulatorFactory) as SimulatedNode)
val timestamper: SimulatedNode = network.createNode(null, nodeFactory = TimestampingNodeFactory) as SimulatedNode
val ratesOracle: SimulatedNode = network.createNode(null, nodeFactory = RatesOracleFactory) as SimulatedNode
val networkMap: SimulatedNode
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
val timestamper: SimulatedNode
= network.createNode(null, nodeFactory = TimestampingNodeFactory, advertisedServices = TimestamperService.Type) as SimulatedNode
val ratesOracle: SimulatedNode
= network.createNode(null, nodeFactory = RatesOracleFactory, advertisedServices = NodeInterestRates.Type) as SimulatedNode
val serviceProviders: List<SimulatedNode> = listOf(timestamper, ratesOracle)
val banks: List<SimulatedNode> = bankFactory.createAll()
init {
// Now wire up the network maps for each node.
// TODO: This is obviously bogus: there should be a single network map for the whole simulated network.
for (node in regulators + serviceProviders + banks) {
(node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes += ratesOracle.info
(node.services.networkMapCache as MockNetworkMapCache).regulators += regulators.map { it.info }
node.services.networkMapCache.addNode(node.info)
}
}

View File

@ -0,0 +1,9 @@
package core.utilities
/**
* Enum for when adding/removing something, for example adding or removing an entry in a directory.
*/
enum class AddOrRemove {
ADD,
REMOVE
}

View File

@ -8,12 +8,8 @@ import core.node.Node
import core.node.NodeConfiguration
import core.node.NodeConfigurationFromConfig
import core.node.NodeInfo
import core.node.services.ArtemisMessagingService
import core.node.services.NodeInterestRates
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.node.services.*
import core.serialization.deserialize
import core.testing.MockNetworkMapCache
import core.utilities.BriefLogFormatter
import demos.protocols.AutoOfferProtocol
import demos.protocols.ExitServerProtocol
@ -33,15 +29,8 @@ fun main(args: Array<String>) {
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("nodedata")
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").withRequiredArg().required()
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
val rateOracleIdentityFile = parser.accepts("rates-oracle-identity-file").withRequiredArg().required()
val rateOracleNetAddr = parser.accepts("rates-oracle-address").requiredIf(rateOracleIdentityFile).withRequiredArg()
val networkMapIdentityFile = parser.accepts("network-map-identity-file").withRequiredArg()
val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg()
// Use these to list one or more peers (again, will be superseded by discovery implementation)
val fakeTradeWithAddr = parser.accepts("fake-trade-with-address").withRequiredArg().required()
@ -66,39 +55,26 @@ fun main(args: Array<String>) {
}
val config = loadConfigFile(configFile)
val advertisedServices: Set<ServiceType>
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(Node.DEFAULT_PORT)
// The timestamping node runs in the same process as the one that passes null to Node constructor.
val timestamperId = if (options.valueOf(timestamperNetAddr).equals(options.valueOf(networkAddressArg))) {
val networkMapId = if (options.valueOf(networkMapNetAddr).equals(options.valueOf(networkAddressArg))) {
// This node provides network map and timestamping services
advertisedServices = setOf(NetworkMapService.Type, TimestamperService.Type)
null
} else {
advertisedServices = setOf(NodeInterestRates.Type)
try {
nodeInfo(options.valueOf(timestamperNetAddr), options.valueOf(timestamperIdentityFile), setOf(TimestamperService.Type))
nodeInfo(options.valueOf(networkMapNetAddr), options.valueOf(networkMapIdentityFile), setOf(NetworkMapService.Type, TimestamperService.Type))
} catch (e: Exception) {
null
}
}
// The timestamping node runs in the same process as the one that passes null to Node constructor.
val rateOracleId = if (options.valueOf(rateOracleNetAddr).equals(options.valueOf(networkAddressArg))) {
null
} else {
try {
nodeInfo(options.valueOf(rateOracleNetAddr), options.valueOf(rateOracleIdentityFile), setOf(NodeInterestRates.Type))
} catch (e: Exception) {
null
}
}
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, DemoClock()).start() }
// Add self to network map
(node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(node.info)
// Add rates oracle to network map
(node.services.networkMapCache as MockNetworkMapCache).ratesOracleNodes.add(rateOracleId)
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices, DemoClock()).start() }
// TODO: This should all be replaced by the identity service being updated
// as the network map changes.
val hostAndPortStrings = options.valuesOf(fakeTradeWithAddr)
val identityFiles = options.valuesOf(fakeTradeWithIdentityFile)
if (hostAndPortStrings.size != identityFiles.size) {
@ -107,9 +83,9 @@ fun main(args: Array<String>) {
for ((hostAndPortString, identityFile) in hostAndPortStrings.zip(identityFiles)) {
try {
val peerId = nodeInfo(hostAndPortString, identityFile)
(node.services.networkMapCache as MockNetworkMapCache).partyNodes.add(peerId)
node.services.identityService.registerIdentity(peerId.identity)
} catch (e: Exception) {
println("Could not load peer identity file \"${identityFile}\".")
}
}

View File

@ -7,6 +7,7 @@ import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.services.ArtemisMessagingService
import core.node.services.NodeInterestRates
import core.node.services.ServiceType
import core.serialization.deserialize
import core.utilities.ANSIProgressRenderer
import core.utilities.BriefLogFormatter
@ -26,6 +27,8 @@ fun main(args: Array<String>) {
val parser = OptionParser()
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data")
val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required()
val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required()
val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required()
val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required()
@ -49,6 +52,10 @@ fun main(args: Array<String>) {
Files.createDirectory(dir)
}
val networkMapAddr = ArtemisMessagingService.makeRecipient(options.valueOf(networkMapAddrArg))
val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>()
val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity)
// Load oracle stuff (in lieu of having a network map service)
val oracleAddr = ArtemisMessagingService.makeRecipient(options.valueOf(oracleAddrArg))
val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>()
@ -59,13 +66,15 @@ fun main(args: Array<String>) {
val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg))
// Bring up node.
var advertisedServices: Set<ServiceType> = emptySet()
val myNetAddr = ArtemisMessagingService.toHostAndPort(options.valueOf(networkAddressArg))
val config = object : NodeConfiguration {
override val myLegalName: String = "Rate fix demo node"
override val exportJMXto: String = "http"
override val nearestCity: String = "Atlantis"
}
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, null).start() }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapAddress, advertisedServices).start() }
// Make a garbage transaction that includes a rate fix.
val tx = TransactionBuilder()

View File

@ -12,10 +12,7 @@ import core.node.Node
import core.node.NodeConfiguration
import core.node.NodeConfigurationFromConfig
import core.node.NodeInfo
import core.node.services.ArtemisMessagingService
import core.node.services.NodeAttachmentService
import core.node.services.NodeWalletService
import core.node.services.TimestamperService
import core.node.services.*
import core.protocols.ProtocolLogic
import core.serialization.deserialize
import core.utilities.ANSIProgressRenderer
@ -48,12 +45,8 @@ fun main(args: Array<String>) {
val serviceFakeTradesArg = parser.accepts("service-fake-trades")
val fakeTradeWithArg = parser.accepts("fake-trade-with").requiredUnless(serviceFakeTradesArg).withRequiredArg()
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
val networkMapIdentityFile = parser.accepts("network-map-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg()
val options = try {
parser.parse(*args)
@ -75,6 +68,7 @@ fun main(args: Array<String>) {
val config = loadConfigFile(configFile)
var advertisedServices: Set<ServiceType> = emptySet()
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(Node.DEFAULT_PORT)
val listening = options.has(serviceFakeTradesArg)
@ -83,15 +77,14 @@ fun main(args: Array<String>) {
exitProcess(1)
}
// The timestamping node runs in the same process as the buyer protocol is run.
val timestamperId = if (options.has(timestamperIdentityFile)) {
val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(Node.DEFAULT_PORT)
val path = Paths.get(options.valueOf(timestamperIdentityFile))
val networkMapId = if (options.has(networkMapIdentityFile)) {
val addr = HostAndPort.fromString(options.valueOf(networkMapNetAddr)).withDefaultPort(Node.DEFAULT_PORT)
val path = Paths.get(options.valueOf(networkMapIdentityFile))
val party = Files.readAllBytes(path).deserialize<Party>()
NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, advertisedServices = setOf(TimestamperService.Type))
NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, setOf(NetworkMapService.Type))
} else null
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId).start() }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices).start() }
if (listening) {
// For demo purposes just extract attachment jars when saved to disk, so the user can explore them.

View File

@ -57,7 +57,7 @@ object AutoOfferProtocol {
progressTracker.currentStep = DEALING
// TODO required as messaging layer does not currently queue messages that arrive before we expect them
Thread.sleep(100)
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.timestamperAddress!!,
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.services.networkMapCache.timestampingNodes.first(),
autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.childrenFor[DEALING]!!)
val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller)
// This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it
@ -93,12 +93,15 @@ object AutoOfferProtocol {
@Suspendable
override fun call(): SignedTransaction {
require(serviceHub.networkMapCache.timestampingNodes.isNotEmpty()) { "No timestamping nodes registered" }
val ourSessionID = random63BitValue()
val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes[0]
val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes.first()
// need to pick which ever party is not us
val otherParty = notUs(*dealToBeOffered.parties).single()
val otherSide = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name))!!.address
val otherNode = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name))
requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } }
val otherSide = otherNode!!.address
progressTracker.currentStep = ANNOUNCING
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, ourSessionID, dealToBeOffered))
progressTracker.currentStep = DEALING

View File

@ -96,7 +96,8 @@ class MockServices(
val net: MessagingService? = null,
val identity: IdentityService? = MockIdentityService,
val storage: StorageService? = MockStorageService(),
val networkMap: NetworkMapCache? = MockNetworkMapCache(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = Clock.systemUTC()
) : ServiceHub {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
@ -108,7 +109,7 @@ class MockServices(
override val networkService: MessagingService
get() = net ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache
get() = networkMap ?: throw UnsupportedOperationException()
get() = mapCache ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
override val clock: Clock

View File

@ -5,7 +5,9 @@ import core.crypto.SecureHash
import core.crypto.sha256
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.services.NetworkMapService
import core.node.services.NodeAttachmentService
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.serialization.OpaqueBytes
import core.testing.MockNetwork
@ -86,9 +88,10 @@ class AttachmentTests {
@Test
fun maliciousResponse() {
// Make a node that doesn't do sanity checking at load time.
val n0 = network.createNode(null, nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, id) {
val n0 = network.createNode(null, -1, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) {
override fun start(): MockNetwork.MockNode {
super.start()
(storage.attachments as NodeAttachmentService).checkAttachmentsOnLoad = false
@ -96,7 +99,7 @@ class AttachmentTests {
}
}
}
}, advertisedServices = setOf(TimestamperService.Type))
}, NetworkMapService.Type, TimestamperService.Type)
val n1 = network.createNode(n0.info)
// Insert an attachment into node zero's store directly.

View File

@ -98,8 +98,12 @@ class TwoPartyTradeProtocolTests {
var (aliceNode, bobNode) = net.createTwoNodes()
val aliceAddr = aliceNode.net.myAddress
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle
val networkMapAddr = aliceNode.info
val timestamperAddr = aliceNode.info
// Clear network map registration messages through before we start
net.runNetwork()
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(2000.DOLLARS)
val alicesFakePaper = fillUpForSeller(false, timestamperAddr.identity, null).second
@ -153,9 +157,10 @@ class TwoPartyTradeProtocolTests {
// ... bring the node back up ... the act of constructing the SMM will re-register the message handlers
// that Bob was waiting on before the reboot occurred.
bobNode = net.createNode(timestamperAddr, bobAddr.id, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, bobAddr.id) {
bobNode = net.createNode(networkMapAddr, bobAddr.id, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, bobAddr.id) {
override fun initialiseStorageService(dir: Path): StorageService {
val ss = super.initialiseStorageService(dir)
val smMap = ss.stateMachines
@ -183,10 +188,12 @@ class TwoPartyTradeProtocolTests {
// Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order
// of gets and puts.
private fun makeNodeWithTracking(name: String): MockNetwork.MockNode {
val networkMapAddr: NodeInfo? = null
// Create a node in the mock network ...
return net.createNode(nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, id) {
return net.createNode(null, nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) {
// That constructs the storage service object in a customised way ...
override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl {
// To use RecordingMaps instead of ordinary HashMaps.

View File

@ -55,7 +55,7 @@ class TimestamperNodeServiceTest {
@Test
fun successWithNetwork() {
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
val timestamperNode = network.createNode(null, advertisedServices = TimestamperService.Type)
val logName = NodeTimestamperService.TIMESTAMPING_PROTOCOL_TOPIC
val psm = TestPSM(timestamperNode.info, clock.instant())
val future = timestamperNode.smm.add(logName, psm)
@ -66,7 +66,7 @@ class TimestamperNodeServiceTest {
@Test
fun wrongCommands() {
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
val timestamperNode = network.createNode(null, advertisedServices = TimestamperService.Type)
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
val service = timestamperNode.inNodeTimestampingService!!
@ -86,7 +86,7 @@ class TimestamperNodeServiceTest {
@Test
fun tooEarly() {
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
val timestamperNode = network.createNode(null, advertisedServices = TimestamperService.Type)
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
val service = timestamperNode.inNodeTimestampingService!!
@ -100,7 +100,7 @@ class TimestamperNodeServiceTest {
@Test
fun tooLate() {
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
val timestamperNode = network.createNode(null, advertisedServices = TimestamperService.Type)
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
val service = timestamperNode.inNodeTimestampingService!!
@ -114,7 +114,7 @@ class TimestamperNodeServiceTest {
@Test
fun success() {
val timestamperNode = network.createNode(null, advertisedServices = setOf(TimestamperService.Type))
val timestamperNode = network.createNode(null, advertisedServices = TimestamperService.Type)
val timestamperKey = timestamperNode.services.storageService.myLegalIdentity.owningKey
val service = timestamperNode.inNodeTimestampingService!!

View File

@ -0,0 +1,23 @@
package core.node.services
import core.testing.MockNetwork
import org.junit.Before
import org.junit.Test
class InMemoryNetworkMapCacheTest {
lateinit var network: MockNetwork
@Before
fun setup() {
network = MockNetwork()
}
@Test
fun registerWithNetwork() {
val (n0, n1) = network.createTwoNodes()
val future = n1.services.networkMapCache.addMapService(n1.smm, n1.net, n0.info, false, null)
network.runNetwork()
future.get()
}
}

View File

@ -0,0 +1,191 @@
package core.node.services
import co.paralleluniverse.fibers.Suspendable
import core.*
import core.crypto.SecureHash
import core.crypto.signWithECDSA
import core.node.NodeInfo
import core.protocols.ProtocolLogic
import core.serialization.serialize
import core.testing.MockNetwork
import core.utilities.AddOrRemove
import core.utilities.BriefLogFormatter
import org.junit.Before
import org.junit.Test
import java.security.PrivateKey
import java.time.Instant
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
class InMemoryNetworkMapServiceTest {
lateinit var network: MockNetwork
init {
BriefLogFormatter.init()
}
@Before
fun setup() {
network = MockNetwork()
}
/**
* Perform basic tests of registering, de-registering and fetching the full network map.
*/
@Test
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService
// Confirm the service contains only its own node
assertEquals(1, service.nodes.count())
assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Register the second node
var seq = 1L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val nodeKey = registerNode.storage.myLegalIdentityKey
val addChange = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
val addWireChange = addChange.toWire(nodeKey.private)
service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
assertEquals(2, service.nodes.count())
assertEquals(mapServiceNode.info, service.processQueryRequest(NetworkMapService.QueryIdentityRequest(mapServiceNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Re-registering should be a no-op
service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
assertEquals(2, service.nodes.count())
// Confirm that de-registering the node succeeds and drops it from the node lists
var removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val removeWireChange = removeChange.toWire(nodeKey.private)
assert(service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Trying to de-register a node that doesn't exist should fail
assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
}
class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash)
: ProtocolLogic<Unit>() {
@Suspendable
override fun call() {
val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress)
send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.address, 0, req)
}
}
class TestFetchPSM(val server: NodeInfo, val subscribe: Boolean, val ifChangedSinceVersion: Int? = null)
: ProtocolLogic<Collection<NodeRegistration>?>() {
@Suspendable
override fun call(): Collection<NodeRegistration>? {
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.FetchMapResponse>(
NetworkMapService.FETCH_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it.nodes }
}
}
class TestRegisterPSM(val server: NodeInfo, val reg: NodeRegistration, val privateKey: PrivateKey)
: ProtocolLogic<NetworkMapService.RegistrationResponse>() {
@Suspendable
override fun call(): NetworkMapService.RegistrationResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.RegistrationResponse>(
NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it }
}
}
class TestSubscribePSM(val server: NodeInfo, val subscribe: Boolean)
: ProtocolLogic<NetworkMapService.SubscribeResponse>() {
@Suspendable
override fun call(): NetworkMapService.SubscribeResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.SubscribeResponse>(
NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it }
}
}
@Test
fun successWithNetwork() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
// Confirm there's a network map service on node 0
assertNotNull(mapServiceNode.inNodeNetworkMapService)
// Confirm all nodes have registered themselves
var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(2, fetchPsm.get()?.count())
// Forcibly deregister the second node
val nodeKey = registerNode.storage.myLegalIdentityKey
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val seq = 2L
val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
network.runNetwork()
assertTrue(registerPsm.get().success)
// Now only map service node should be registered
fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
}
@Test
fun subscribeWithNetwork() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService)
// Test subscribing to updates
val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
TestSubscribePSM(mapServiceNode.info, true))
network.runNetwork()
subscribePsm.get()
// Check the unacknowledged count is zero
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address))
// Fire off an update
val nodeKey = registerNode.storage.myLegalIdentityKey
var seq = 1L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
var wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg)
// Check the unacknowledged count is one
assertEquals(1, service.getUnacknowledgedCount(registerNode.info.address))
// Send in an acknowledgment and verify the count goes down
val hash = SecureHash.sha256(wireReg.raw.bits)
val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
TestAcknowledgePSM(mapServiceNode.info, hash))
network.runNetwork()
acknowledgePsm.get()
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address))
// Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit
// is hit. On the last iteration overflow the pending list, and check the node is unsubscribed
for (i in 0..service.maxUnacknowledgedUpdates) {
reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg)
if (i < service.maxUnacknowledgedUpdates) {
assertEquals(i + 1, service.getUnacknowledgedCount(registerNode.info.address))
} else {
assertNull(service.getUnacknowledgedCount(registerNode.info.address))
}
}
}
}