Enable network map service

This commit is contained in:
Ross Nicoll 2016-04-27 14:35:59 +01:00
parent 00a2088fa5
commit 147f8f37ce
15 changed files with 412 additions and 129 deletions

View File

@ -20,7 +20,9 @@ if [[ "$mode" == "nodeA" ]]; then
RC=83
while [ $RC -eq 83 ]
do
build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost --fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public
build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost \
--fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public \
--network-map-identity-file=nodeA/identity-public --network-map-address=localhost
RC=$?
done
elif [[ "$mode" == "nodeB" ]]; then
@ -35,7 +37,9 @@ elif [[ "$mode" == "nodeB" ]]; then
RC=83
while [ $RC -eq 83 ]
do
build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 --fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public &
build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 \
--fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public \
-network-map-identity-file=nodeA/identity-public --network-map-address=localhost &
while ! curl -F rates=@scripts/example.rates.txt http://localhost:31341/upload/interest-rates; do
echo "Retry to upload interest rates to oracle after 5 seconds"
sleep 5

View File

@ -3,14 +3,18 @@ package core.node
import api.APIServer
import api.APIServerImpl
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import core.Party
import core.crypto.generateKeyPair
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.node.services.*
import core.random63BitValue
import core.serialization.deserialize
import core.serialization.serialize
import core.testing.MockNetworkMapCache
import core.utilities.AddOrRemove
import core.utilities.AffinityExecutor
import org.slf4j.Logger
@ -19,6 +23,7 @@ import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
@ -26,13 +31,24 @@ import java.util.*
* A base node implementation that can be customised either for production (with real implementations that do real
* I/O), or a mock implementation suitable for unit test environments.
*/
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val timestamperAddress: NodeInfo?,
// TODO: Where this node is the initial network map service, currently no initialNetworkMapAddress is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val initialNetworkMapAddress: NodeInfo?,
val advertisedServices: Set<ServiceType>, val platformClock: Clock) {
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
}
val networkMapServiceCallTimeout: Duration = Duration.ofSeconds(1)
// TODO: Persist this, as well as whether the node is registered.
/**
* Sequence number of changes sent to the network map service, when registering/de-registering this node
*/
var networkMapSeq: Long = 1
protected abstract val log: Logger
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
@ -84,15 +100,57 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// Build services we're advertising
if (NetworkMapService.Type in info.advertisedServices) makeNetworkMapService()
makeTimestampingService(timestamperAddress)
if (TimestamperService.Type in info.advertisedServices) makeTimestampingService()
identity = makeIdentityService()
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
startMessagingService()
require(initialNetworkMapAddress == null || NetworkMapService.Type in initialNetworkMapAddress.advertisedServices)
{ "Initial network map address must indicate a node that provides a network map service" }
configureNetworkMapCache(initialNetworkMapAddress)
return this
}
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
private fun configureNetworkMapCache(networkMapAddress: NodeInfo?) {
services.networkMapCache.addNode(info)
if (initialNetworkMapAddress != null) {
// TODO: Return a future so the caller knows these operations may not have completed yet, and can monitor
// if needed
updateRegistration(initialNetworkMapAddress, AddOrRemove.ADD)
services.networkMapCache.addMapService(this.smm, net, initialNetworkMapAddress, true, null)
}
if (inNodeNetworkMapService != null) {
// Register for updates
services.networkMapCache.addMapService(this.smm, net, info, true, null)
}
}
private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture<NetworkMapService.RegistrationResponse> {
// Register this node against the network
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
val message = net.createMessage(NetworkMapService.REGISTER_PROTOCOL_TOPIC + ".0", request.serialize().bits)
val future = SettableFuture.create<NetworkMapService.RegistrationResponse>()
val topic = NetworkMapService.REGISTER_PROTOCOL_TOPIC + "." + sessionID
net.runOnNextMessage(topic, MoreExecutors.directExecutor()) { message ->
future.set(message.data.deserialize())
}
net.send(message, serviceInfo.address)
return future
}
open protected fun makeNetworkMapService() {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
@ -100,22 +158,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
}
private fun makeTimestampingService(timestamperAddress: NodeInfo?) {
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
val tsid = if (timestamperAddress != null) {
inNodeTimestampingService = null
require(TimestamperService.Type in timestamperAddress.advertisedServices) {
"Timestamper address must indicate a node that provides timestamping services, actually " +
"has ${timestamperAddress.advertisedServices}"
}
timestamperAddress
} else {
info.advertisedServices += TimestamperService.Type
inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock)
NodeInfo(net.myAddress, storage.myLegalIdentity, setOf(TimestamperService.Type))
}
services.networkMapCache.addNode(tsid)
open protected fun makeTimestampingService() {
inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock)
}
lateinit var interestRatesService: NodeInterestRates.Service
@ -127,26 +171,32 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
protected open fun makeIdentityService(): IdentityService {
// We don't have any identity infrastructure right now, so we just throw together the only identities we
// know about: our own, the identity of the remote timestamper node (if any), plus whatever is in the
// network map.
//
val service = InMemoryIdentityService()
if (timestamperAddress != null)
service.registerIdentity(timestamperAddress.identity)
if (initialNetworkMapAddress != null)
service.registerIdentity(initialNetworkMapAddress.identity)
service.registerIdentity(storage.myLegalIdentity)
services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) }
// TODO: Subscribe to updates to the network map cache
return service
}
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
// to indicate "Please shut down gracefully" vs "Shut down now".
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process.
net.stop()
}
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun startMessagingService()
protected open fun initialiseStorageService(dir: Path): StorageService {
val attachments = makeAttachmentStorage(dir)
_servicesThatAcceptUploads += attachments
@ -198,4 +248,3 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics)
}
}

View File

@ -41,14 +41,15 @@ class ConfigurationException(message: String) : Exception(message)
* @param p2pAddr The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param configuration This is typically loaded from a .properties file
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
* @param networkMapAddress An external network map service to use. Should only ever be null when creating the first
* network map service, while bootstrapping a network.
* @param advertisedServices The services this node advertises. This must be a subset of the services it runs,
* but nodes are not required to advertise services they run (hence subset).
* @param clock The clock used within the node and by all protocols etc
*/
class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration,
timestamperAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, timestamperAddress, advertisedServices, clock) {
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
val DEFAULT_PORT = 31337
@ -66,6 +67,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, serverThread)
override fun startMessagingService() {
// Start up the MQ service.
(net as ArtemisMessagingService).start()
}
private fun initWebServer(): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
@ -117,8 +123,6 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
alreadyRunningNodeCheck()
super.start()
webServer = initWebServer()
// Start up the MQ service.
(net as ArtemisMessagingService).start()
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).

View File

@ -274,6 +274,8 @@ class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemo
return WireNodeRegistration(regSerialized, regSig)
}
override fun toString() : String = "$node #${serial} (${type})"
}
/**

View File

@ -1,6 +1,8 @@
package core.testing
import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import core.Party
import core.messaging.MessagingService
import core.messaging.SingleMessageRecipient
@ -46,19 +48,18 @@ class MockNetwork(private val threadPerNode: Boolean = false,
/** Allows customisation of how nodes are created. */
interface Factory {
fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode
}
object DefaultFactory : Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode {
return MockNode(dir, config, network, timestamperAddr, advertisedServices, id)
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNode {
return MockNode(dir, config, network, networkMapAddr, advertisedServices, id)
}
}
open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork,
withTimestamper: NodeInfo?, advertisedServices: Set<ServiceType>,
val id: Int) : AbstractNode(dir, config, withTimestamper, advertisedServices, Clock.systemUTC()) {
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, val id: Int) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) {
override val log: Logger = loggerFor<MockNode>()
override val serverThread: AffinityExecutor =
if (mockNet.threadPerNode)
@ -76,6 +77,10 @@ class MockNetwork(private val threadPerNode: Boolean = false,
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
override fun startMessagingService() {
// Nothing to do
}
// There is no need to slow down the unit tests by initialising CityDatabase
override fun findMyLocation(): PhysicalLocation? = null
@ -89,7 +94,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
}
/** Returns a started node, optionally created by the passed factory method */
fun createNode(withTimestamper: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
vararg advertisedServices: ServiceType): MockNode {
val newNode = forcedID == -1
val id = if (newNode) counter++ else forcedID
@ -102,7 +107,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
override val exportJMXto: String = ""
override val nearestCity: String = "Atlantis"
}
val node = nodeFactory.create(path, config, this, withTimestamper, advertisedServices.toSet(), id).start()
val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id).start()
_nodes.add(node)
return node
}

View File

@ -5,7 +5,10 @@ import core.node.CityDatabase
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.PhysicalLocation
import core.node.services.NetworkMapService
import core.node.services.NodeInterestRates
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.protocols.ProtocolLogic
import core.then
import core.utilities.ProgressTracker
@ -33,15 +36,15 @@ abstract class Simulation(val runAsync: Boolean,
// This puts together a mock network of SimulatedNodes.
open class SimulatedNode(dir: Path, config: NodeConfiguration, mockNet: MockNetwork,
withTimestamper: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int)
: MockNetwork.MockNode(dir, config, mockNet, withTimestamper, advertisedServices, id) {
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int) : MockNetwork.MockNode(dir, config, mockNet, networkMapAddress, advertisedServices, id) {
override fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
}
inner class BankFactory : MockNetwork.Factory {
var counter = 0
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
val letter = 'A' + counter
val city = bankLocations[counter++ % bankLocations.size]
val cfg = object : NodeConfiguration {
@ -50,52 +53,71 @@ abstract class Simulation(val runAsync: Boolean,
override val exportJMXto: String = ""
override val nearestCity: String = city
}
return SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id)
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id)
}
fun createAll(): List<SimulatedNode> = bankLocations.map { network.createNode(timestamper.info, nodeFactory = this) as SimulatedNode }
fun createAll(): List<SimulatedNode> = bankLocations.
map { network.createNode(networkMap.info, nodeFactory = this) as SimulatedNode }
}
val bankFactory = BankFactory()
object NetworkMapNodeFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(NetworkMapService.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Network Map Service Provider"
override val exportJMXto: String = ""
override val nearestCity: String = "Madrid"
}
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) { }
}
}
object TimestampingNodeFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(TimestamperService.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Timestamping Service" // A magic string recognised by the CP contract
override val exportJMXto: String = ""
override val nearestCity: String = "Zurich"
}
return SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id)
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id)
}
}
object RatesOracleFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
require(advertisedServices.contains(NodeInterestRates.Type))
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Rates Service Provider"
override val exportJMXto: String = ""
override val nearestCity: String = "Madrid"
}
val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) {
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) {
override fun makeInterestRatesOracleService() {
super.makeInterestRatesOracleService()
interestRatesService.upload(javaClass.getResourceAsStream("example.rates.txt"))
}
}
return n
}
}
object RegulatorFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Regulator A"
override val exportJMXto: String = ""
override val nearestCity: String = "Paris"
}
val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) {
val n = object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) {
// TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request.
// So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it.
// But that's fine for visualisation purposes.
@ -107,20 +129,19 @@ abstract class Simulation(val runAsync: Boolean,
val network = MockNetwork(false)
val regulators: List<SimulatedNode> = listOf(network.createNode(null, nodeFactory = RegulatorFactory) as SimulatedNode)
val timestamper: SimulatedNode = network.createNode(null, nodeFactory = TimestampingNodeFactory) as SimulatedNode
val ratesOracle: SimulatedNode = network.createNode(null, nodeFactory = RatesOracleFactory) as SimulatedNode
val networkMap: SimulatedNode
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
val timestamper: SimulatedNode
= network.createNode(null, nodeFactory = TimestampingNodeFactory, advertisedServices = TimestamperService.Type) as SimulatedNode
val ratesOracle: SimulatedNode
= network.createNode(null, nodeFactory = RatesOracleFactory, advertisedServices = NodeInterestRates.Type) as SimulatedNode
val serviceProviders: List<SimulatedNode> = listOf(timestamper, ratesOracle)
val banks: List<SimulatedNode> = bankFactory.createAll()
init {
// Now wire up the network maps for each node.
// TODO: This is obviously bogus: there should be a single network map for the whole simulated network.
for (node in regulators + serviceProviders + banks) {
val cache = (node.services.networkMapCache as MockNetworkMapCache)
cache.addRegistration(ratesOracle.info)
regulators.forEach { regulator ->
cache.addRegistration(regulator.info)
}
node.services.networkMapCache.addNode(node.info)
}
}

View File

@ -8,12 +8,8 @@ import core.node.Node
import core.node.NodeConfiguration
import core.node.NodeConfigurationFromConfig
import core.node.NodeInfo
import core.node.services.ArtemisMessagingService
import core.node.services.NodeInterestRates
import core.node.services.ServiceType
import core.node.services.TimestamperService
import core.node.services.*
import core.serialization.deserialize
import core.testing.MockNetworkMapCache
import core.utilities.BriefLogFormatter
import demos.protocols.AutoOfferProtocol
import demos.protocols.ExitServerProtocol
@ -22,7 +18,6 @@ import joptsimple.OptionParser
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.*
import kotlin.system.exitProcess
// IRS DEMO
@ -34,15 +29,8 @@ fun main(args: Array<String>) {
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("nodedata")
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").withRequiredArg().required()
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
val rateOracleIdentityFile = parser.accepts("rates-oracle-identity-file").withRequiredArg().required()
val rateOracleNetAddr = parser.accepts("rates-oracle-address").requiredIf(rateOracleIdentityFile).withRequiredArg()
val networkMapIdentityFile = parser.accepts("network-map-identity-file").withRequiredArg()
val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg()
// Use these to list one or more peers (again, will be superseded by discovery implementation)
val fakeTradeWithAddr = parser.accepts("fake-trade-with-address").withRequiredArg().required()
@ -67,42 +55,26 @@ fun main(args: Array<String>) {
}
val config = loadConfigFile(configFile)
val advertisedServices = HashSet<ServiceType>()
val advertisedServices: Set<ServiceType>
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(Node.DEFAULT_PORT)
// The timestamping node runs in the same process as the one that passes null to Node constructor.
val timestamperId = if (options.valueOf(timestamperNetAddr).equals(options.valueOf(networkAddressArg))) {
// This node provides timestamping services
advertisedServices.add(TimestamperService.Type)
val networkMapId = if (options.valueOf(networkMapNetAddr).equals(options.valueOf(networkAddressArg))) {
// This node provides network map and timestamping services
advertisedServices = setOf(NetworkMapService.Type, TimestamperService.Type)
null
} else {
advertisedServices = setOf(NodeInterestRates.Type)
try {
nodeInfo(options.valueOf(timestamperNetAddr), options.valueOf(timestamperIdentityFile), setOf(TimestamperService.Type))
nodeInfo(options.valueOf(networkMapNetAddr), options.valueOf(networkMapIdentityFile), setOf(NetworkMapService.Type, TimestamperService.Type))
} catch (e: Exception) {
null
}
}
// The timestamping node runs in the same process as the one that passes null to Node constructor.
val rateOracleId = if (options.valueOf(rateOracleNetAddr).equals(options.valueOf(networkAddressArg))) {
advertisedServices.add(NodeInterestRates.Type)
null
} else {
try {
nodeInfo(options.valueOf(rateOracleNetAddr), options.valueOf(rateOracleIdentityFile), setOf(NodeInterestRates.Type))
} catch (e: Exception) {
null
}
}
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, advertisedServices, DemoClock()).start() }
// Add self to network map
node.services.networkMapCache.addNode(node.info)
// Add rates oracle to network map if one has been specified
rateOracleId?.let { node.services.networkMapCache.addNode(it) }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices, DemoClock()).start() }
// TODO: This should all be replaced by the identity service being updated
// as the network map changes.
val hostAndPortStrings = options.valuesOf(fakeTradeWithAddr)
val identityFiles = options.valuesOf(fakeTradeWithIdentityFile)
if (hostAndPortStrings.size != identityFiles.size) {
@ -111,9 +83,9 @@ fun main(args: Array<String>) {
for ((hostAndPortString, identityFile) in hostAndPortStrings.zip(identityFiles)) {
try {
val peerId = nodeInfo(hostAndPortString, identityFile)
(node.services.networkMapCache as MockNetworkMapCache).addRegistration(peerId)
node.services.identityService.registerIdentity(peerId.identity)
} catch (e: Exception) {
println("Could not load peer identity file \"${identityFile}\".")
}
}

View File

@ -27,6 +27,8 @@ fun main(args: Array<String>) {
val parser = OptionParser()
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data")
val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required()
val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required()
val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required()
val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required()
@ -50,6 +52,10 @@ fun main(args: Array<String>) {
Files.createDirectory(dir)
}
val networkMapAddr = ArtemisMessagingService.makeRecipient(options.valueOf(networkMapAddrArg))
val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>()
val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity)
// Load oracle stuff (in lieu of having a network map service)
val oracleAddr = ArtemisMessagingService.makeRecipient(options.valueOf(oracleAddrArg))
val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>()
@ -67,7 +73,8 @@ fun main(args: Array<String>) {
override val exportJMXto: String = "http"
override val nearestCity: String = "Atlantis"
}
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, null, advertisedServices).start() }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapAddress, advertisedServices).start() }
// Make a garbage transaction that includes a rate fix.
val tx = TransactionBuilder()

View File

@ -45,12 +45,8 @@ fun main(args: Array<String>) {
val serviceFakeTradesArg = parser.accepts("service-fake-trades")
val fakeTradeWithArg = parser.accepts("fake-trade-with").requiredUnless(serviceFakeTradesArg).withRequiredArg()
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
val networkMapIdentityFile = parser.accepts("network-map-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg()
val options = try {
parser.parse(*args)
@ -81,15 +77,14 @@ fun main(args: Array<String>) {
exitProcess(1)
}
// The timestamping node runs in the same process as the buyer protocol is run.
val timestamperId = if (options.has(timestamperIdentityFile)) {
val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(Node.DEFAULT_PORT)
val path = Paths.get(options.valueOf(timestamperIdentityFile))
val networkMapId = if (options.has(networkMapIdentityFile)) {
val addr = HostAndPort.fromString(options.valueOf(networkMapNetAddr)).withDefaultPort(Node.DEFAULT_PORT)
val path = Paths.get(options.valueOf(networkMapIdentityFile))
val party = Files.readAllBytes(path).deserialize<Party>()
NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, advertisedServices = setOf(TimestamperService.Type))
NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, setOf(NetworkMapService.Type))
} else null
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, advertisedServices).start() }
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices).start() }
if (listening) {
// For demo purposes just extract attachment jars when saved to disk, so the user can explore them.

View File

@ -57,7 +57,7 @@ object AutoOfferProtocol {
progressTracker.currentStep = DEALING
// TODO required as messaging layer does not currently queue messages that arrive before we expect them
Thread.sleep(100)
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.timestamperAddress!!,
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.services.networkMapCache.timestampingNodes.first(),
autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.childrenFor[DEALING]!!)
val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller)
// This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it
@ -93,12 +93,15 @@ object AutoOfferProtocol {
@Suspendable
override fun call(): SignedTransaction {
require(serviceHub.networkMapCache.timestampingNodes.isNotEmpty()) { "No timestamping nodes registered" }
val ourSessionID = random63BitValue()
val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes[0]
val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes.first()
// need to pick which ever party is not us
val otherParty = notUs(*dealToBeOffered.parties).single()
val otherSide = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name))!!.address
val otherNode = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name))
requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } }
val otherSide = otherNode!!.address
progressTracker.currentStep = ANNOUNCING
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, ourSessionID, dealToBeOffered))
progressTracker.currentStep = DEALING

View File

@ -96,7 +96,8 @@ class MockServices(
val net: MessagingService? = null,
val identity: IdentityService? = MockIdentityService,
val storage: StorageService? = MockStorageService(),
val networkMap: NetworkMapCache? = MockNetworkMapCache(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = Clock.systemUTC()
) : ServiceHub {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
@ -108,7 +109,7 @@ class MockServices(
override val networkService: MessagingService
get() = net ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache
get() = networkMap ?: throw UnsupportedOperationException()
get() = mapCache ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
override val clock: Clock

View File

@ -5,6 +5,7 @@ import core.crypto.SecureHash
import core.crypto.sha256
import core.node.NodeConfiguration
import core.node.NodeInfo
import core.node.services.NetworkMapService
import core.node.services.NodeAttachmentService
import core.node.services.ServiceType
import core.node.services.TimestamperService
@ -87,10 +88,10 @@ class AttachmentTests {
@Test
fun maliciousResponse() {
// Make a node that doesn't do sanity checking at load time.
val n0 = network.createNode(null, nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?,
val n0 = network.createNode(null, -1, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, id) {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) {
override fun start(): MockNetwork.MockNode {
super.start()
(storage.attachments as NodeAttachmentService).checkAttachmentsOnLoad = false
@ -98,7 +99,7 @@ class AttachmentTests {
}
}
}
}, advertisedServices = TimestamperService.Type)
}, NetworkMapService.Type, TimestamperService.Type)
val n1 = network.createNode(n0.info)
// Insert an attachment into node zero's store directly.

View File

@ -98,8 +98,12 @@ class TwoPartyTradeProtocolTests {
var (aliceNode, bobNode) = net.createTwoNodes()
val aliceAddr = aliceNode.net.myAddress
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle
val networkMapAddr = aliceNode.info
val timestamperAddr = aliceNode.info
// Clear network map registration messages through before we start
net.runNetwork()
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(2000.DOLLARS)
val alicesFakePaper = fillUpForSeller(false, timestamperAddr.identity, null).second
@ -153,10 +157,10 @@ class TwoPartyTradeProtocolTests {
// ... bring the node back up ... the act of constructing the SMM will re-register the message handlers
// that Bob was waiting on before the reboot occurred.
bobNode = net.createNode(timestamperAddr, bobAddr.id, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?,
bobNode = net.createNode(networkMapAddr, bobAddr.id, object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, bobAddr.id) {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, bobAddr.id) {
override fun initialiseStorageService(dir: Path): StorageService {
val ss = super.initialiseStorageService(dir)
val smMap = ss.stateMachines
@ -184,11 +188,12 @@ class TwoPartyTradeProtocolTests {
// Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order
// of gets and puts.
private fun makeNodeWithTracking(name: String): MockNetwork.MockNode {
val networkMapAddr: NodeInfo? = null
// Create a node in the mock network ...
return net.createNode(nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?,
return net.createNode(null, nodeFactory = object : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int): MockNetwork.MockNode {
return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, id) {
return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) {
// That constructs the storage service object in a customised way ...
override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl {
// To use RecordingMaps instead of ordinary HashMaps.

View File

@ -0,0 +1,23 @@
package core.node.services
import core.testing.MockNetwork
import org.junit.Before
import org.junit.Test
class InMemoryNetworkMapCacheTest {
lateinit var network: MockNetwork
@Before
fun setup() {
network = MockNetwork()
}
@Test
fun registerWithNetwork() {
val (n0, n1) = network.createTwoNodes()
val future = n1.services.networkMapCache.addMapService(n1.smm, n1.net, n0.info, false, null)
network.runNetwork()
future.get()
}
}

View File

@ -0,0 +1,191 @@
package core.node.services
import co.paralleluniverse.fibers.Suspendable
import core.*
import core.crypto.SecureHash
import core.crypto.signWithECDSA
import core.node.NodeInfo
import core.protocols.ProtocolLogic
import core.serialization.serialize
import core.testing.MockNetwork
import core.utilities.AddOrRemove
import core.utilities.BriefLogFormatter
import org.junit.Before
import org.junit.Test
import java.security.PrivateKey
import java.time.Instant
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
class InMemoryNetworkMapServiceTest {
lateinit var network: MockNetwork
init {
BriefLogFormatter.init()
}
@Before
fun setup() {
network = MockNetwork()
}
/**
* Perform basic tests of registering, de-registering and fetching the full network map.
*/
@Test
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService
// Confirm the service contains only its own node
assertEquals(1, service.nodes.count())
assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Register the second node
var seq = 1L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val nodeKey = registerNode.storage.myLegalIdentityKey
val addChange = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
val addWireChange = addChange.toWire(nodeKey.private)
service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
assertEquals(2, service.nodes.count())
assertEquals(mapServiceNode.info, service.processQueryRequest(NetworkMapService.QueryIdentityRequest(mapServiceNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Re-registering should be a no-op
service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
assertEquals(2, service.nodes.count())
// Confirm that de-registering the node succeeds and drops it from the node lists
var removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val removeWireChange = removeChange.toWire(nodeKey.private)
assert(service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Trying to de-register a node that doesn't exist should fail
assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
}
class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash)
: ProtocolLogic<Unit>() {
@Suspendable
override fun call() {
val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress)
send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.address, 0, req)
}
}
class TestFetchPSM(val server: NodeInfo, val subscribe: Boolean, val ifChangedSinceVersion: Int? = null)
: ProtocolLogic<Collection<NodeRegistration>?>() {
@Suspendable
override fun call(): Collection<NodeRegistration>? {
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.FetchMapResponse>(
NetworkMapService.FETCH_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it.nodes }
}
}
class TestRegisterPSM(val server: NodeInfo, val reg: NodeRegistration, val privateKey: PrivateKey)
: ProtocolLogic<NetworkMapService.RegistrationResponse>() {
@Suspendable
override fun call(): NetworkMapService.RegistrationResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.RegistrationResponse>(
NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it }
}
}
class TestSubscribePSM(val server: NodeInfo, val subscribe: Boolean)
: ProtocolLogic<NetworkMapService.SubscribeResponse>() {
@Suspendable
override fun call(): NetworkMapService.SubscribeResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.SubscribeResponse>(
NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
.validate { it }
}
}
@Test
fun successWithNetwork() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
// Confirm there's a network map service on node 0
assertNotNull(mapServiceNode.inNodeNetworkMapService)
// Confirm all nodes have registered themselves
var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(2, fetchPsm.get()?.count())
// Forcibly deregister the second node
val nodeKey = registerNode.storage.myLegalIdentityKey
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val seq = 2L
val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
network.runNetwork()
assertTrue(registerPsm.get().success)
// Now only map service node should be registered
fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
}
@Test
fun subscribeWithNetwork() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService)
// Test subscribing to updates
val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
TestSubscribePSM(mapServiceNode.info, true))
network.runNetwork()
subscribePsm.get()
// Check the unacknowledged count is zero
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address))
// Fire off an update
val nodeKey = registerNode.storage.myLegalIdentityKey
var seq = 1L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
var wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg)
// Check the unacknowledged count is one
assertEquals(1, service.getUnacknowledgedCount(registerNode.info.address))
// Send in an acknowledgment and verify the count goes down
val hash = SecureHash.sha256(wireReg.raw.bits)
val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
TestAcknowledgePSM(mapServiceNode.info, hash))
network.runNetwork()
acknowledgePsm.get()
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address))
// Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit
// is hit. On the last iteration overflow the pending list, and check the node is unsubscribed
for (i in 0..service.maxUnacknowledgedUpdates) {
reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg)
if (i < service.maxUnacknowledgedUpdates) {
assertEquals(i + 1, service.getUnacknowledgedCount(registerNode.info.address))
} else {
assertNull(service.getUnacknowledgedCount(registerNode.info.address))
}
}
}
}