diff --git a/scripts/irs-demo.sh b/scripts/irs-demo.sh index 0774190608..0b7a50599d 100755 --- a/scripts/irs-demo.sh +++ b/scripts/irs-demo.sh @@ -20,7 +20,9 @@ if [[ "$mode" == "nodeA" ]]; then RC=83 while [ $RC -eq 83 ] do - build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost --fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public + build/install/r3prototyping/bin/irsdemo --dir=nodeA --network-address=localhost \ + --fake-trade-with-address=localhost:31340 --fake-trade-with-identity=nodeB/identity-public \ + --network-map-identity-file=nodeA/identity-public --network-map-address=localhost RC=$? done elif [[ "$mode" == "nodeB" ]]; then @@ -35,7 +37,9 @@ elif [[ "$mode" == "nodeB" ]]; then RC=83 while [ $RC -eq 83 ] do - build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 --fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public --timestamper-identity-file=nodeA/identity-public --timestamper-address=localhost --rates-oracle-address=localhost:31340 --rates-oracle-identity-file=nodeB/identity-public & + build/install/r3prototyping/bin/irsdemo --dir=nodeB --network-address=localhost:31340 \ + --fake-trade-with-address=localhost --fake-trade-with-identity=nodeA/identity-public \ + -network-map-identity-file=nodeA/identity-public --network-map-address=localhost & while ! curl -F rates=@scripts/example.rates.txt http://localhost:31341/upload/interest-rates; do echo "Retry to upload interest rates to oracle after 5 seconds" sleep 5 diff --git a/src/main/kotlin/core/node/AbstractNode.kt b/src/main/kotlin/core/node/AbstractNode.kt index 9fd207b119..684e858727 100644 --- a/src/main/kotlin/core/node/AbstractNode.kt +++ b/src/main/kotlin/core/node/AbstractNode.kt @@ -3,14 +3,18 @@ package core.node import api.APIServer import api.APIServerImpl import com.codahale.metrics.MetricRegistry +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.SettableFuture import core.Party import core.crypto.generateKeyPair import core.messaging.MessagingService import core.messaging.StateMachineManager +import core.messaging.runOnNextMessage import core.node.services.* +import core.random63BitValue import core.serialization.deserialize import core.serialization.serialize -import core.testing.MockNetworkMapCache import core.utilities.AddOrRemove import core.utilities.AffinityExecutor import org.slf4j.Logger @@ -19,6 +23,7 @@ import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair import java.time.Clock +import java.time.Duration import java.time.Instant import java.util.* @@ -26,13 +31,24 @@ import java.util.* * A base node implementation that can be customised either for production (with real implementations that do real * I/O), or a mock implementation suitable for unit test environments. */ -abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val timestamperAddress: NodeInfo?, +// TODO: Where this node is the initial network map service, currently no initialNetworkMapAddress is provided. +// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the +// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in. +abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val initialNetworkMapAddress: NodeInfo?, val advertisedServices: Set, val platformClock: Clock) { companion object { val PRIVATE_KEY_FILE_NAME = "identity-private-key" val PUBLIC_IDENTITY_FILE_NAME = "identity-public" } + val networkMapServiceCallTimeout: Duration = Duration.ofSeconds(1) + + // TODO: Persist this, as well as whether the node is registered. + /** + * Sequence number of changes sent to the network map service, when registering/de-registering this node + */ + var networkMapSeq: Long = 1 + protected abstract val log: Logger // We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the @@ -84,15 +100,57 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // Build services we're advertising if (NetworkMapService.Type in info.advertisedServices) makeNetworkMapService() - makeTimestampingService(timestamperAddress) + if (TimestamperService.Type in info.advertisedServices) makeTimestampingService() + identity = makeIdentityService() // This object doesn't need to be referenced from this class because it registers handlers on the network // service and so that keeps it from being collected. DataVendingService(net, storage) + startMessagingService() + + require(initialNetworkMapAddress == null || NetworkMapService.Type in initialNetworkMapAddress.advertisedServices) + { "Initial network map address must indicate a node that provides a network map service" } + configureNetworkMapCache(initialNetworkMapAddress) + return this } + /** + * Register this node with the network map cache, and load network map from a remote service (and register for + * updates) if one has been supplied. + */ + private fun configureNetworkMapCache(networkMapAddress: NodeInfo?) { + services.networkMapCache.addNode(info) + if (initialNetworkMapAddress != null) { + // TODO: Return a future so the caller knows these operations may not have completed yet, and can monitor + // if needed + updateRegistration(initialNetworkMapAddress, AddOrRemove.ADD) + services.networkMapCache.addMapService(this.smm, net, initialNetworkMapAddress, true, null) + } + if (inNodeNetworkMapService != null) { + // Register for updates + services.networkMapCache.addMapService(this.smm, net, info, true, null) + } + } + + private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture { + // Register this node against the network + val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val reg = NodeRegistration(info, networkMapSeq++, type, expires) + val sessionID = random63BitValue() + val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) + val message = net.createMessage(NetworkMapService.REGISTER_PROTOCOL_TOPIC + ".0", request.serialize().bits) + val future = SettableFuture.create() + val topic = NetworkMapService.REGISTER_PROTOCOL_TOPIC + "." + sessionID + + net.runOnNextMessage(topic, MoreExecutors.directExecutor()) { message -> + future.set(message.data.deserialize()) + } + net.send(message, serviceInfo.address) + + return future + } open protected fun makeNetworkMapService() { val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD @@ -100,22 +158,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache) } - private fun makeTimestampingService(timestamperAddress: NodeInfo?) { - // Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are - // given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping. - val tsid = if (timestamperAddress != null) { - inNodeTimestampingService = null - require(TimestamperService.Type in timestamperAddress.advertisedServices) { - "Timestamper address must indicate a node that provides timestamping services, actually " + - "has ${timestamperAddress.advertisedServices}" - } - timestamperAddress - } else { - info.advertisedServices += TimestamperService.Type - inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock) - NodeInfo(net.myAddress, storage.myLegalIdentity, setOf(TimestamperService.Type)) - } - services.networkMapCache.addNode(tsid) + open protected fun makeTimestampingService() { + inNodeTimestampingService = NodeTimestamperService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, platformClock) } lateinit var interestRatesService: NodeInterestRates.Service @@ -127,26 +171,32 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, } protected open fun makeIdentityService(): IdentityService { - // We don't have any identity infrastructure right now, so we just throw together the only identities we - // know about: our own, the identity of the remote timestamper node (if any), plus whatever is in the - // network map. - // val service = InMemoryIdentityService() - if (timestamperAddress != null) - service.registerIdentity(timestamperAddress.identity) + if (initialNetworkMapAddress != null) + service.registerIdentity(initialNetworkMapAddress.identity) service.registerIdentity(storage.myLegalIdentity) services.networkMapCache.partyNodes.forEach { service.registerIdentity(it.identity) } + // TODO: Subscribe to updates to the network map cache + return service } open fun stop() { + // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the + // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() + // to indicate "Please shut down gracefully" vs "Shut down now". + // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it + // unsubscribes us forcibly, rather than blocking the shutdown process. + net.stop() } protected abstract fun makeMessagingService(): MessagingService + protected abstract fun startMessagingService() + protected open fun initialiseStorageService(dir: Path): StorageService { val attachments = makeAttachmentStorage(dir) _servicesThatAcceptUploads += attachments @@ -198,4 +248,3 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics) } } - diff --git a/src/main/kotlin/core/node/Node.kt b/src/main/kotlin/core/node/Node.kt index 73c929f3fb..3a6f4c9bb4 100644 --- a/src/main/kotlin/core/node/Node.kt +++ b/src/main/kotlin/core/node/Node.kt @@ -41,14 +41,15 @@ class ConfigurationException(message: String) : Exception(message) * @param p2pAddr The host and port that this server will use. It can't find out its own external hostname, so you * have to specify that yourself. * @param configuration This is typically loaded from a .properties file - * @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one. + * @param networkMapAddress An external network map service to use. Should only ever be null when creating the first + * network map service, while bootstrapping a network. * @param advertisedServices The services this node advertises. This must be a subset of the services it runs, * but nodes are not required to advertise services they run (hence subset). * @param clock The clock used within the node and by all protocols etc */ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration, - timestamperAddress: NodeInfo?, advertisedServices: Set, - clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, timestamperAddress, advertisedServices, clock) { + networkMapAddress: NodeInfo?, advertisedServices: Set, + clock: Clock = Clock.systemUTC()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) { companion object { /** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */ val DEFAULT_PORT = 31337 @@ -66,6 +67,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, serverThread) + override fun startMessagingService() { + // Start up the MQ service. + (net as ArtemisMessagingService).start() + } + private fun initWebServer(): Server { // Note that the web server handlers will all run concurrently, and not on the node thread. @@ -117,8 +123,6 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration alreadyRunningNodeCheck() super.start() webServer = initWebServer() - // Start up the MQ service. - (net as ArtemisMessagingService).start() // Begin exporting our own metrics via JMX. JmxReporter. forRegistry(services.monitoringService.metrics). diff --git a/src/main/kotlin/core/node/services/NetworkMapService.kt b/src/main/kotlin/core/node/services/NetworkMapService.kt index f17427b32c..7d6f40cb0a 100644 --- a/src/main/kotlin/core/node/services/NetworkMapService.kt +++ b/src/main/kotlin/core/node/services/NetworkMapService.kt @@ -274,6 +274,8 @@ class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemo return WireNodeRegistration(regSerialized, regSig) } + + override fun toString() : String = "$node #${serial} (${type})" } /** diff --git a/src/main/kotlin/core/testing/MockNode.kt b/src/main/kotlin/core/testing/MockNode.kt index 66a3ff3e78..15b93bb07a 100644 --- a/src/main/kotlin/core/testing/MockNode.kt +++ b/src/main/kotlin/core/testing/MockNode.kt @@ -1,6 +1,8 @@ package core.testing import com.google.common.jimfs.Jimfs +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture import core.Party import core.messaging.MessagingService import core.messaging.SingleMessageRecipient @@ -46,19 +48,18 @@ class MockNetwork(private val threadPerNode: Boolean = false, /** Allows customisation of how nodes are created. */ interface Factory { fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, - timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNode + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNode } object DefaultFactory : Factory { override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, - timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNode { - return MockNode(dir, config, network, timestamperAddr, advertisedServices, id) + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNode { + return MockNode(dir, config, network, networkMapAddr, advertisedServices, id) } } open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork, - withTimestamper: NodeInfo?, advertisedServices: Set, - val id: Int) : AbstractNode(dir, config, withTimestamper, advertisedServices, Clock.systemUTC()) { + networkMapAddr: NodeInfo?, advertisedServices: Set, val id: Int) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) { override val log: Logger = loggerFor() override val serverThread: AffinityExecutor = if (mockNet.threadPerNode) @@ -76,6 +77,10 @@ class MockNetwork(private val threadPerNode: Boolean = false, override fun makeIdentityService() = MockIdentityService(mockNet.identities) + override fun startMessagingService() { + // Nothing to do + } + // There is no need to slow down the unit tests by initialising CityDatabase override fun findMyLocation(): PhysicalLocation? = null @@ -89,7 +94,7 @@ class MockNetwork(private val threadPerNode: Boolean = false, } /** Returns a started node, optionally created by the passed factory method */ - fun createNode(withTimestamper: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory, + fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory, vararg advertisedServices: ServiceType): MockNode { val newNode = forcedID == -1 val id = if (newNode) counter++ else forcedID @@ -102,7 +107,7 @@ class MockNetwork(private val threadPerNode: Boolean = false, override val exportJMXto: String = "" override val nearestCity: String = "Atlantis" } - val node = nodeFactory.create(path, config, this, withTimestamper, advertisedServices.toSet(), id).start() + val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id).start() _nodes.add(node) return node } diff --git a/src/main/kotlin/core/testing/Simulation.kt b/src/main/kotlin/core/testing/Simulation.kt index c629e5cb41..ecf8594be3 100644 --- a/src/main/kotlin/core/testing/Simulation.kt +++ b/src/main/kotlin/core/testing/Simulation.kt @@ -5,7 +5,10 @@ import core.node.CityDatabase import core.node.NodeConfiguration import core.node.NodeInfo import core.node.PhysicalLocation +import core.node.services.NetworkMapService +import core.node.services.NodeInterestRates import core.node.services.ServiceType +import core.node.services.TimestamperService import core.protocols.ProtocolLogic import core.then import core.utilities.ProgressTracker @@ -33,15 +36,15 @@ abstract class Simulation(val runAsync: Boolean, // This puts together a mock network of SimulatedNodes. open class SimulatedNode(dir: Path, config: NodeConfiguration, mockNet: MockNetwork, - withTimestamper: NodeInfo?, advertisedServices: Set, id: Int) - : MockNetwork.MockNode(dir, config, mockNet, withTimestamper, advertisedServices, id) { + networkMapAddress: NodeInfo?, advertisedServices: Set, id: Int) : MockNetwork.MockNode(dir, config, mockNet, networkMapAddress, advertisedServices, id) { override fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity] } inner class BankFactory : MockNetwork.Factory { var counter = 0 - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { val letter = 'A' + counter val city = bankLocations[counter++ % bankLocations.size] val cfg = object : NodeConfiguration { @@ -50,52 +53,71 @@ abstract class Simulation(val runAsync: Boolean, override val exportJMXto: String = "" override val nearestCity: String = city } - return SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) + return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) } - fun createAll(): List = bankLocations.map { network.createNode(timestamper.info, nodeFactory = this) as SimulatedNode } + fun createAll(): List = bankLocations. + map { network.createNode(networkMap.info, nodeFactory = this) as SimulatedNode } } val bankFactory = BankFactory() + object NetworkMapNodeFactory : MockNetwork.Factory { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + require(advertisedServices.contains(NetworkMapService.Type)) + val cfg = object : NodeConfiguration { + override val myLegalName: String = "Network Map Service Provider" + override val exportJMXto: String = "" + override val nearestCity: String = "Madrid" + } + + return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) { } + } + } + object TimestampingNodeFactory : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + require(advertisedServices.contains(TimestamperService.Type)) val cfg = object : NodeConfiguration { override val myLegalName: String = "Timestamping Service" // A magic string recognised by the CP contract override val exportJMXto: String = "" override val nearestCity: String = "Zurich" } - return SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) + return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) } } object RatesOracleFactory : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + require(advertisedServices.contains(NodeInterestRates.Type)) val cfg = object : NodeConfiguration { override val myLegalName: String = "Rates Service Provider" override val exportJMXto: String = "" override val nearestCity: String = "Madrid" } - val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) { + return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) { override fun makeInterestRatesOracleService() { super.makeInterestRatesOracleService() interestRatesService.upload(javaClass.getResourceAsStream("example.rates.txt")) } } - return n } } object RegulatorFactory : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, + networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { val cfg = object : NodeConfiguration { override val myLegalName: String = "Regulator A" override val exportJMXto: String = "" override val nearestCity: String = "Paris" } - val n = object : SimulatedNode(dir, cfg, network, timestamperAddr, advertisedServices, id) { + val n = object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id) { // TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request. // So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it. // But that's fine for visualisation purposes. @@ -107,20 +129,19 @@ abstract class Simulation(val runAsync: Boolean, val network = MockNetwork(false) val regulators: List = listOf(network.createNode(null, nodeFactory = RegulatorFactory) as SimulatedNode) - val timestamper: SimulatedNode = network.createNode(null, nodeFactory = TimestampingNodeFactory) as SimulatedNode - val ratesOracle: SimulatedNode = network.createNode(null, nodeFactory = RatesOracleFactory) as SimulatedNode + val networkMap: SimulatedNode + = network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode + val timestamper: SimulatedNode + = network.createNode(null, nodeFactory = TimestampingNodeFactory, advertisedServices = TimestamperService.Type) as SimulatedNode + val ratesOracle: SimulatedNode + = network.createNode(null, nodeFactory = RatesOracleFactory, advertisedServices = NodeInterestRates.Type) as SimulatedNode val serviceProviders: List = listOf(timestamper, ratesOracle) val banks: List = bankFactory.createAll() init { // Now wire up the network maps for each node. - // TODO: This is obviously bogus: there should be a single network map for the whole simulated network. for (node in regulators + serviceProviders + banks) { - val cache = (node.services.networkMapCache as MockNetworkMapCache) - cache.addRegistration(ratesOracle.info) - regulators.forEach { regulator -> - cache.addRegistration(regulator.info) - } + node.services.networkMapCache.addNode(node.info) } } diff --git a/src/main/kotlin/demos/IRSDemo.kt b/src/main/kotlin/demos/IRSDemo.kt index c847e56d8f..c5cb5f311f 100644 --- a/src/main/kotlin/demos/IRSDemo.kt +++ b/src/main/kotlin/demos/IRSDemo.kt @@ -8,12 +8,8 @@ import core.node.Node import core.node.NodeConfiguration import core.node.NodeConfigurationFromConfig import core.node.NodeInfo -import core.node.services.ArtemisMessagingService -import core.node.services.NodeInterestRates -import core.node.services.ServiceType -import core.node.services.TimestamperService +import core.node.services.* import core.serialization.deserialize -import core.testing.MockNetworkMapCache import core.utilities.BriefLogFormatter import demos.protocols.AutoOfferProtocol import demos.protocols.ExitServerProtocol @@ -22,7 +18,6 @@ import joptsimple.OptionParser import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths -import java.util.* import kotlin.system.exitProcess // IRS DEMO @@ -34,15 +29,8 @@ fun main(args: Array) { val networkAddressArg = parser.accepts("network-address").withRequiredArg().required() val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("nodedata") - // Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the - // network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity - // is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly - // to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility). - val timestamperIdentityFile = parser.accepts("timestamper-identity-file").withRequiredArg().required() - val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg() - - val rateOracleIdentityFile = parser.accepts("rates-oracle-identity-file").withRequiredArg().required() - val rateOracleNetAddr = parser.accepts("rates-oracle-address").requiredIf(rateOracleIdentityFile).withRequiredArg() + val networkMapIdentityFile = parser.accepts("network-map-identity-file").withRequiredArg() + val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg() // Use these to list one or more peers (again, will be superseded by discovery implementation) val fakeTradeWithAddr = parser.accepts("fake-trade-with-address").withRequiredArg().required() @@ -67,42 +55,26 @@ fun main(args: Array) { } val config = loadConfigFile(configFile) - val advertisedServices = HashSet() + val advertisedServices: Set val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(Node.DEFAULT_PORT) - // The timestamping node runs in the same process as the one that passes null to Node constructor. - val timestamperId = if (options.valueOf(timestamperNetAddr).equals(options.valueOf(networkAddressArg))) { - // This node provides timestamping services - advertisedServices.add(TimestamperService.Type) + val networkMapId = if (options.valueOf(networkMapNetAddr).equals(options.valueOf(networkAddressArg))) { + // This node provides network map and timestamping services + advertisedServices = setOf(NetworkMapService.Type, TimestamperService.Type) null } else { + advertisedServices = setOf(NodeInterestRates.Type) try { - nodeInfo(options.valueOf(timestamperNetAddr), options.valueOf(timestamperIdentityFile), setOf(TimestamperService.Type)) + nodeInfo(options.valueOf(networkMapNetAddr), options.valueOf(networkMapIdentityFile), setOf(NetworkMapService.Type, TimestamperService.Type)) } catch (e: Exception) { null } } - // The timestamping node runs in the same process as the one that passes null to Node constructor. - val rateOracleId = if (options.valueOf(rateOracleNetAddr).equals(options.valueOf(networkAddressArg))) { - advertisedServices.add(NodeInterestRates.Type) - null - } else { - try { - nodeInfo(options.valueOf(rateOracleNetAddr), options.valueOf(rateOracleIdentityFile), setOf(NodeInterestRates.Type)) - } catch (e: Exception) { - null - } - } - - val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, advertisedServices, DemoClock()).start() } - - // Add self to network map - node.services.networkMapCache.addNode(node.info) - - // Add rates oracle to network map if one has been specified - rateOracleId?.let { node.services.networkMapCache.addNode(it) } + val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices, DemoClock()).start() } + // TODO: This should all be replaced by the identity service being updated + // as the network map changes. val hostAndPortStrings = options.valuesOf(fakeTradeWithAddr) val identityFiles = options.valuesOf(fakeTradeWithIdentityFile) if (hostAndPortStrings.size != identityFiles.size) { @@ -111,9 +83,9 @@ fun main(args: Array) { for ((hostAndPortString, identityFile) in hostAndPortStrings.zip(identityFiles)) { try { val peerId = nodeInfo(hostAndPortString, identityFile) - (node.services.networkMapCache as MockNetworkMapCache).addRegistration(peerId) node.services.identityService.registerIdentity(peerId.identity) } catch (e: Exception) { + println("Could not load peer identity file \"${identityFile}\".") } } diff --git a/src/main/kotlin/demos/RateFixDemo.kt b/src/main/kotlin/demos/RateFixDemo.kt index 5b002bd2a1..c70cc7a9ba 100644 --- a/src/main/kotlin/demos/RateFixDemo.kt +++ b/src/main/kotlin/demos/RateFixDemo.kt @@ -27,6 +27,8 @@ fun main(args: Array) { val parser = OptionParser() val networkAddressArg = parser.accepts("network-address").withRequiredArg().required() val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data") + val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required() + val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required() val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required() val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required() @@ -50,6 +52,10 @@ fun main(args: Array) { Files.createDirectory(dir) } + val networkMapAddr = ArtemisMessagingService.makeRecipient(options.valueOf(networkMapAddrArg)) + val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize() + val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity) + // Load oracle stuff (in lieu of having a network map service) val oracleAddr = ArtemisMessagingService.makeRecipient(options.valueOf(oracleAddrArg)) val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize() @@ -67,7 +73,8 @@ fun main(args: Array) { override val exportJMXto: String = "http" override val nearestCity: String = "Atlantis" } - val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, null, advertisedServices).start() } + + val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapAddress, advertisedServices).start() } // Make a garbage transaction that includes a rate fix. val tx = TransactionBuilder() diff --git a/src/main/kotlin/demos/TraderDemo.kt b/src/main/kotlin/demos/TraderDemo.kt index de452b24bd..d9eb27509d 100644 --- a/src/main/kotlin/demos/TraderDemo.kt +++ b/src/main/kotlin/demos/TraderDemo.kt @@ -45,12 +45,8 @@ fun main(args: Array) { val serviceFakeTradesArg = parser.accepts("service-fake-trades") val fakeTradeWithArg = parser.accepts("fake-trade-with").requiredUnless(serviceFakeTradesArg).withRequiredArg() - // Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the - // network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity - // is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly - // to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility). - val timestamperIdentityFile = parser.accepts("timestamper-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg() - val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg() + val networkMapIdentityFile = parser.accepts("network-map-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg() + val networkMapNetAddr = parser.accepts("network-map-address").requiredIf(networkMapIdentityFile).withRequiredArg() val options = try { parser.parse(*args) @@ -81,15 +77,14 @@ fun main(args: Array) { exitProcess(1) } - // The timestamping node runs in the same process as the buyer protocol is run. - val timestamperId = if (options.has(timestamperIdentityFile)) { - val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(Node.DEFAULT_PORT) - val path = Paths.get(options.valueOf(timestamperIdentityFile)) + val networkMapId = if (options.has(networkMapIdentityFile)) { + val addr = HostAndPort.fromString(options.valueOf(networkMapNetAddr)).withDefaultPort(Node.DEFAULT_PORT) + val path = Paths.get(options.valueOf(networkMapIdentityFile)) val party = Files.readAllBytes(path).deserialize() - NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, advertisedServices = setOf(TimestamperService.Type)) + NodeInfo(ArtemisMessagingService.makeRecipient(addr), party, setOf(NetworkMapService.Type)) } else null - val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId, advertisedServices).start() } + val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, networkMapId, advertisedServices).start() } if (listening) { // For demo purposes just extract attachment jars when saved to disk, so the user can explore them. diff --git a/src/main/kotlin/demos/protocols/AutoOfferProtocol.kt b/src/main/kotlin/demos/protocols/AutoOfferProtocol.kt index fcc01942cd..0f4cb5732a 100644 --- a/src/main/kotlin/demos/protocols/AutoOfferProtocol.kt +++ b/src/main/kotlin/demos/protocols/AutoOfferProtocol.kt @@ -57,7 +57,7 @@ object AutoOfferProtocol { progressTracker.currentStep = DEALING // TODO required as messaging layer does not currently queue messages that arrive before we expect them Thread.sleep(100) - val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.timestamperAddress!!, + val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.services.networkMapCache.timestampingNodes.first(), autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.childrenFor[DEALING]!!) val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller) // This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it @@ -93,12 +93,15 @@ object AutoOfferProtocol { @Suspendable override fun call(): SignedTransaction { + require(serviceHub.networkMapCache.timestampingNodes.isNotEmpty()) { "No timestamping nodes registered" } val ourSessionID = random63BitValue() - val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes[0] + val timestampingAuthority = serviceHub.networkMapCache.timestampingNodes.first() // need to pick which ever party is not us val otherParty = notUs(*dealToBeOffered.parties).single() - val otherSide = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name))!!.address + val otherNode = (serviceHub.networkMapCache.nodeForPartyName(otherParty.name)) + requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } } + val otherSide = otherNode!!.address progressTracker.currentStep = ANNOUNCING send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, ourSessionID, dealToBeOffered)) progressTracker.currentStep = DEALING diff --git a/src/test/kotlin/core/MockServices.kt b/src/test/kotlin/core/MockServices.kt index 80ce99aad9..73f1eae9cc 100644 --- a/src/test/kotlin/core/MockServices.kt +++ b/src/test/kotlin/core/MockServices.kt @@ -96,7 +96,8 @@ class MockServices( val net: MessagingService? = null, val identity: IdentityService? = MockIdentityService, val storage: StorageService? = MockStorageService(), - val networkMap: NetworkMapCache? = MockNetworkMapCache(), + val mapCache: NetworkMapCache? = MockNetworkMapCache(), + val mapService: NetworkMapService? = null, val overrideClock: Clock? = Clock.systemUTC() ) : ServiceHub { override val walletService: WalletService = customWallet ?: NodeWalletService(this) @@ -108,7 +109,7 @@ class MockServices( override val networkService: MessagingService get() = net ?: throw UnsupportedOperationException() override val networkMapCache: NetworkMapCache - get() = networkMap ?: throw UnsupportedOperationException() + get() = mapCache ?: throw UnsupportedOperationException() override val storageService: StorageService get() = storage ?: throw UnsupportedOperationException() override val clock: Clock diff --git a/src/test/kotlin/core/messaging/AttachmentTests.kt b/src/test/kotlin/core/messaging/AttachmentTests.kt index 92310ed57c..0e874ae0d6 100644 --- a/src/test/kotlin/core/messaging/AttachmentTests.kt +++ b/src/test/kotlin/core/messaging/AttachmentTests.kt @@ -5,6 +5,7 @@ import core.crypto.SecureHash import core.crypto.sha256 import core.node.NodeConfiguration import core.node.NodeInfo +import core.node.services.NetworkMapService import core.node.services.NodeAttachmentService import core.node.services.ServiceType import core.node.services.TimestamperService @@ -87,10 +88,10 @@ class AttachmentTests { @Test fun maliciousResponse() { // Make a node that doesn't do sanity checking at load time. - val n0 = network.createNode(null, nodeFactory = object : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, + val n0 = network.createNode(null, -1, object : MockNetwork.Factory { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { - return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, id) { + return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) { override fun start(): MockNetwork.MockNode { super.start() (storage.attachments as NodeAttachmentService).checkAttachmentsOnLoad = false @@ -98,7 +99,7 @@ class AttachmentTests { } } } - }, advertisedServices = TimestamperService.Type) + }, NetworkMapService.Type, TimestamperService.Type) val n1 = network.createNode(n0.info) // Insert an attachment into node zero's store directly. diff --git a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt index 81663f91b8..70bc849d19 100644 --- a/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt +++ b/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt @@ -98,8 +98,12 @@ class TwoPartyTradeProtocolTests { var (aliceNode, bobNode) = net.createTwoNodes() val aliceAddr = aliceNode.net.myAddress val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle + val networkMapAddr = aliceNode.info val timestamperAddr = aliceNode.info + // Clear network map registration messages through before we start + net.runNetwork() + (bobNode.wallet as NodeWalletService).fillWithSomeTestCash(2000.DOLLARS) val alicesFakePaper = fillUpForSeller(false, timestamperAddr.identity, null).second @@ -153,10 +157,10 @@ class TwoPartyTradeProtocolTests { // ... bring the node back up ... the act of constructing the SMM will re-register the message handlers // that Bob was waiting on before the reboot occurred. - bobNode = net.createNode(timestamperAddr, bobAddr.id, object : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, + bobNode = net.createNode(networkMapAddr, bobAddr.id, object : MockNetwork.Factory { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { - return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, bobAddr.id) { + return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, bobAddr.id) { override fun initialiseStorageService(dir: Path): StorageService { val ss = super.initialiseStorageService(dir) val smMap = ss.stateMachines @@ -184,11 +188,12 @@ class TwoPartyTradeProtocolTests { // Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order // of gets and puts. private fun makeNodeWithTracking(name: String): MockNetwork.MockNode { + val networkMapAddr: NodeInfo? = null // Create a node in the mock network ... - return net.createNode(nodeFactory = object : MockNetwork.Factory { - override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, timestamperAddr: NodeInfo?, + return net.createNode(null, nodeFactory = object : MockNetwork.Factory { + override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?, advertisedServices: Set, id: Int): MockNetwork.MockNode { - return object : MockNetwork.MockNode(dir, config, network, timestamperAddr, advertisedServices, id) { + return object : MockNetwork.MockNode(dir, config, network, networkMapAddr, advertisedServices, id) { // That constructs the storage service object in a customised way ... override fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party): StorageServiceImpl { // To use RecordingMaps instead of ordinary HashMaps. diff --git a/src/test/kotlin/core/node/services/InMemoryNetworkMapCacheTest.kt b/src/test/kotlin/core/node/services/InMemoryNetworkMapCacheTest.kt new file mode 100644 index 0000000000..7d639f487f --- /dev/null +++ b/src/test/kotlin/core/node/services/InMemoryNetworkMapCacheTest.kt @@ -0,0 +1,23 @@ +package core.node.services + +import core.testing.MockNetwork +import org.junit.Before +import org.junit.Test + +class InMemoryNetworkMapCacheTest { + lateinit var network: MockNetwork + + @Before + fun setup() { + network = MockNetwork() + } + + @Test + fun registerWithNetwork() { + val (n0, n1) = network.createTwoNodes() + + val future = n1.services.networkMapCache.addMapService(n1.smm, n1.net, n0.info, false, null) + network.runNetwork() + future.get() + } +} diff --git a/src/test/kotlin/core/node/services/InMemoryNetworkMapServiceTest.kt b/src/test/kotlin/core/node/services/InMemoryNetworkMapServiceTest.kt new file mode 100644 index 0000000000..0f13582e7d --- /dev/null +++ b/src/test/kotlin/core/node/services/InMemoryNetworkMapServiceTest.kt @@ -0,0 +1,191 @@ +package core.node.services + +import co.paralleluniverse.fibers.Suspendable +import core.* +import core.crypto.SecureHash +import core.crypto.signWithECDSA +import core.node.NodeInfo +import core.protocols.ProtocolLogic +import core.serialization.serialize +import core.testing.MockNetwork +import core.utilities.AddOrRemove +import core.utilities.BriefLogFormatter +import org.junit.Before +import org.junit.Test +import java.security.PrivateKey +import java.time.Instant +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class InMemoryNetworkMapServiceTest { + lateinit var network: MockNetwork + + init { + BriefLogFormatter.init() + } + + @Before + fun setup() { + network = MockNetwork() + } + + /** + * Perform basic tests of registering, de-registering and fetching the full network map. + */ + @Test + fun success() { + val (mapServiceNode, registerNode) = network.createTwoNodes() + val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService + + // Confirm the service contains only its own node + assertEquals(1, service.nodes.count()) + assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) + + // Register the second node + var seq = 1L + val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val nodeKey = registerNode.storage.myLegalIdentityKey + val addChange = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) + val addWireChange = addChange.toWire(nodeKey.private) + service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) + assertEquals(2, service.nodes.count()) + assertEquals(mapServiceNode.info, service.processQueryRequest(NetworkMapService.QueryIdentityRequest(mapServiceNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) + + // Re-registering should be a no-op + service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) + assertEquals(2, service.nodes.count()) + + // Confirm that de-registering the node succeeds and drops it from the node lists + var removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires) + val removeWireChange = removeChange.toWire(nodeKey.private) + assert(service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) + assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) + + // Trying to de-register a node that doesn't exist should fail + assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) + } + + class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash) + : ProtocolLogic() { + @Suspendable + override fun call() { + val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress) + send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.address, 0, req) + } + } + + class TestFetchPSM(val server: NodeInfo, val subscribe: Boolean, val ifChangedSinceVersion: Int? = null) + : ProtocolLogic?>() { + @Suspendable + override fun call(): Collection? { + val sessionID = random63BitValue() + val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID) + return sendAndReceive( + NetworkMapService.FETCH_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + .validate { it.nodes } + } + } + + class TestRegisterPSM(val server: NodeInfo, val reg: NodeRegistration, val privateKey: PrivateKey) + : ProtocolLogic() { + @Suspendable + override fun call(): NetworkMapService.RegistrationResponse { + val sessionID = random63BitValue() + val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID) + + return sendAndReceive( + NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + .validate { it } + } + } + + class TestSubscribePSM(val server: NodeInfo, val subscribe: Boolean) + : ProtocolLogic() { + @Suspendable + override fun call(): NetworkMapService.SubscribeResponse { + val sessionID = random63BitValue() + val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID) + + return sendAndReceive( + NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + .validate { it } + } + } + + @Test + fun successWithNetwork() { + val (mapServiceNode, registerNode) = network.createTwoNodes() + + // Confirm there's a network map service on node 0 + assertNotNull(mapServiceNode.inNodeNetworkMapService) + + // Confirm all nodes have registered themselves + var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) + network.runNetwork() + assertEquals(2, fetchPsm.get()?.count()) + + // Forcibly deregister the second node + val nodeKey = registerNode.storage.myLegalIdentityKey + val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val seq = 2L + val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires) + val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private)) + network.runNetwork() + assertTrue(registerPsm.get().success) + + // Now only map service node should be registered + fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) + network.runNetwork() + assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single()) + } + + @Test + fun subscribeWithNetwork() { + val (mapServiceNode, registerNode) = network.createTwoNodes() + val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService) + + // Test subscribing to updates + val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, + TestSubscribePSM(mapServiceNode.info, true)) + network.runNetwork() + subscribePsm.get() + + // Check the unacknowledged count is zero + assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address)) + + // Fire off an update + val nodeKey = registerNode.storage.myLegalIdentityKey + var seq = 1L + val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) + var wireReg = reg.toWire(nodeKey.private) + service.notifySubscribers(wireReg) + + // Check the unacknowledged count is one + assertEquals(1, service.getUnacknowledgedCount(registerNode.info.address)) + + // Send in an acknowledgment and verify the count goes down + val hash = SecureHash.sha256(wireReg.raw.bits) + val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, + TestAcknowledgePSM(mapServiceNode.info, hash)) + network.runNetwork() + acknowledgePsm.get() + + assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address)) + + // Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit + // is hit. On the last iteration overflow the pending list, and check the node is unsubscribed + for (i in 0..service.maxUnacknowledgedUpdates) { + reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) + wireReg = reg.toWire(nodeKey.private) + service.notifySubscribers(wireReg) + if (i < service.maxUnacknowledgedUpdates) { + assertEquals(i + 1, service.getUnacknowledgedCount(registerNode.info.address)) + } else { + assertNull(service.getUnacknowledgedCount(registerNode.info.address)) + } + } + } +}