diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 32d00262f3..4089096a12 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -381,15 +381,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } private fun registerWithNetworkMapIfConfigured(): ListenableFuture { - require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) { - "Initial network map address must indicate a node that provides a network map service" - } services.networkMapCache.addNode(info) // In the unit test environment, we may run without any network map service sometimes. return if (networkMapAddress == null && inNodeNetworkMapService == null) { services.networkMapCache.runWithoutMapService() noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache - } else { registerWithNetworkMap() } @@ -400,11 +396,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, * updates) if one has been supplied. */ protected open fun registerWithNetworkMap(): ListenableFuture { + require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) { + "Initial network map address must indicate a node that provides a network map service" + } val address = networkMapAddress ?: info.address // Register for updates, even if we're the one running the network map. return sendNetworkMapRegistration(address).flatMap { response -> - check(response.success) { "The network map service rejected our registration request" } - // This Future will complete on the same executor as sendNetworkMapRegistration, namely the one used by net + check(response.error == null) { "Unable to register with the network map service: ${response.error}" } + // The future returned addMapService will complete on the same executor as sendNetworkMapRegistration, namely the one used by net services.networkMapCache.addMapService(net, address, true, null) } } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt index 8d8e6302cf..fba01f9d37 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt @@ -2,6 +2,7 @@ package net.corda.node.services.network import com.google.common.annotations.VisibleForTesting import kotlinx.support.jdk8.collections.compute +import kotlinx.support.jdk8.collections.removeIf import net.corda.core.ThreadBox import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party @@ -24,7 +25,16 @@ import net.corda.core.utilities.loggerFor import net.corda.flows.ServiceRequestMessage import net.corda.node.services.api.AbstractNodeService import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.network.NetworkMapService.* +import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC import net.corda.node.utilities.AddOrRemove +import net.corda.node.utilities.AddOrRemove.ADD +import net.corda.node.utilities.AddOrRemove.REMOVE import java.security.PrivateKey import java.security.SignatureException import java.time.Instant @@ -62,38 +72,36 @@ interface NetworkMapService { // Base topic for messages acknowledging pushed updates val PUSH_ACK_TOPIC = "platform.network_map.push_ack" - val logger = loggerFor() - val type = ServiceType.corda.getSubType("network_map") } - val nodes: List - - class FetchMapRequest(val subscribe: Boolean, - val ifChangedSinceVersion: Int?, - override val replyTo: SingleMessageRecipient, - override val sessionID: Long = random63BitValue()) : ServiceRequestMessage + data class FetchMapRequest(val subscribe: Boolean, + val ifChangedSinceVersion: Int?, + override val replyTo: SingleMessageRecipient, + override val sessionID: Long = random63BitValue()) : ServiceRequestMessage @CordaSerializable - data class FetchMapResponse(val nodes: Collection?, val version: Int) + data class FetchMapResponse(val nodes: List?, val version: Int) - class QueryIdentityRequest(val identity: Party, - override val replyTo: SingleMessageRecipient, - override val sessionID: Long) : ServiceRequestMessage + data class QueryIdentityRequest(val identity: Party, + override val replyTo: SingleMessageRecipient, + override val sessionID: Long = random63BitValue()) : ServiceRequestMessage @CordaSerializable data class QueryIdentityResponse(val node: NodeInfo?) - class RegistrationRequest(val wireReg: WireNodeRegistration, - override val replyTo: SingleMessageRecipient, - override val sessionID: Long = random63BitValue()) : ServiceRequestMessage + // TODO Rename this RegistractionChangeRequest or similar (and related classes) + data class RegistrationRequest(val wireReg: WireNodeRegistration, + override val replyTo: SingleMessageRecipient, + override val sessionID: Long = random63BitValue()) : ServiceRequestMessage + /** If [error] is null then the registration was successful. If not null then it wasn't and it explains why */ @CordaSerializable - data class RegistrationResponse(val success: Boolean) + data class RegistrationResponse(val error: String?) - class SubscribeRequest(val subscribe: Boolean, - override val replyTo: SingleMessageRecipient, - override val sessionID: Long = random63BitValue()) : ServiceRequestMessage + data class SubscribeRequest(val subscribe: Boolean, + override val replyTo: SingleMessageRecipient, + override val sessionID: Long = random63BitValue()) : ServiceRequestMessage @CordaSerializable data class SubscribeResponse(val confirmed: Boolean) @@ -107,7 +115,7 @@ interface NetworkMapService { @ThreadSafe class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) { - override val registeredNodes: MutableMap = ConcurrentHashMap() + override val nodeRegistrations: MutableMap = ConcurrentHashMap() override val subscribers = ThreadBox(mutableMapOf()) init { @@ -122,9 +130,17 @@ class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkM * subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks. */ @ThreadSafe -abstract class AbstractNetworkMapService -(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) { - protected abstract val registeredNodes: MutableMap +abstract class AbstractNetworkMapService(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) { + companion object { + /** + * Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a + * 10 times overhead. + */ + private const val MAX_SIZE_REGISTRATION_REQUEST_BYTES = 5500 + private val logger = loggerFor() + } + + protected abstract val nodeRegistrations: MutableMap // Map from subscriber address, to most recently acknowledged update map version. protected abstract val subscribers: ThreadBox> @@ -135,38 +151,19 @@ abstract class AbstractNetworkMapService val mapVersion: Int get() = _mapVersion.get() - private fun mapVersionIncrementAndGet(): Int = _mapVersion.incrementAndGet() - /** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */ val maxUnacknowledgedUpdates = 10 - /** - * Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a - * 10 times overhead. - */ - val maxSizeRegistrationRequestBytes = 5500 private val handlers = ArrayList() - // Filter reduces this to the entries that add a node to the map - override val nodes: List - get() = registeredNodes.mapNotNull { if (it.value.reg.type == AddOrRemove.ADD) it.value.reg.node else null } - protected fun setup() { // Register message handlers - handlers += addMessageHandler(NetworkMapService.FETCH_TOPIC, - { req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) } - ) - handlers += addMessageHandler(NetworkMapService.QUERY_TOPIC, - { req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) } - ) - handlers += addMessageHandler(NetworkMapService.REGISTER_TOPIC, - { req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) } - ) - handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_TOPIC, - { req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) } - ) - handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r -> - val req = message.data.deserialize() + handlers += addMessageHandler(FETCH_TOPIC) { req: FetchMapRequest -> processFetchAllRequest(req) } + handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) } + handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) } + handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) } + handlers += net.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r -> + val req = message.data.deserialize() processAcknowledge(req) } } @@ -193,135 +190,105 @@ abstract class AbstractNetworkMapService subscribers.locked { remove(subscriber) } } - @VisibleForTesting - fun getUnacknowledgedCount(subscriber: SingleMessageRecipient, mapVersion: Int): Int? { - return subscribers.locked { - val subscriberMapVersion = get(subscriber)?.mapVersion - if (subscriberMapVersion != null) { - mapVersion - subscriberMapVersion - } else { - null + private fun processAcknowledge(request: UpdateAcknowledge): Unit { + if (request.replyTo !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber() + subscribers.locked { + val lastVersionAcked = this[request.replyTo]?.mapVersion + if ((lastVersionAcked ?: 0) < request.mapVersion) { + this[request.replyTo] = LastAcknowledgeInfo(request.mapVersion) } } } - @VisibleForTesting - fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) { + private fun processFetchAllRequest(request: FetchMapRequest): FetchMapResponse { + if (request.subscribe) { + addSubscriber(request.replyTo) + } + val currentVersion = mapVersion + val nodeRegistrations = if (request.ifChangedSinceVersion == null || request.ifChangedSinceVersion < currentVersion) { + // We return back the current state of the entire map including nodes that have been removed + ArrayList(nodeRegistrations.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals + } else { + null + } + return FetchMapResponse(nodeRegistrations, currentVersion) + } + + private fun processQueryRequest(request: QueryIdentityRequest): QueryIdentityResponse { + val candidate = nodeRegistrations[request.identity]?.reg + // If the most recent record we have is of the node being removed from the map, then it's considered + // as no match. + val node = if (candidate == null || candidate.type == REMOVE) null else candidate.node + return QueryIdentityResponse(node) + } + + private fun processRegistrationRequest(request: RegistrationRequest): RegistrationResponse { + if (request.wireReg.raw.size > MAX_SIZE_REGISTRATION_REQUEST_BYTES) return RegistrationResponse("Request is too big") + + val change = try { + request.wireReg.verified() + } catch(e: SignatureException) { + return RegistrationResponse("Invalid signature on request") + } + + val node = change.node + + // Update the current value atomically, so that if multiple updates come + // in on different threads, there is no risk of a race condition while checking + // sequence numbers. + val registrationInfo = try { + nodeRegistrations.compute(node.legalIdentity) { mapKey: Party, existing: NodeRegistrationInfo? -> + require(!((existing == null || existing.reg.type == REMOVE) && change.type == REMOVE)) { + "Attempting to de-register unknown node" + } + require(existing == null || existing.reg.serial < change.serial) { "Serial value is too small" } + NodeRegistrationInfo(change, _mapVersion.incrementAndGet()) + } + } catch (e: IllegalArgumentException) { + return RegistrationResponse(e.message) + } + + notifySubscribers(request.wireReg, registrationInfo!!.mapVersion) + + // Update the local cache + // TODO: Once local messaging is fixed, this should go over the network layer as it does to other + // subscribers + when (change.type) { + ADD -> { + logger.info("Added node ${node.address} to network map") + services.networkMapCache.addNode(change.node) + } + REMOVE -> { + logger.info("Removed node ${node.address} from network map") + services.networkMapCache.removeNode(change.node) + } + } + + return RegistrationResponse(null) + } + + private fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) { // TODO: Once we have a better established messaging system, we can probably send // to a MessageRecipientGroup that nodes join/leave, rather than the network map // service itself managing the group val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes - val message = net.createMessage(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID, update) + val message = net.createMessage(PUSH_TOPIC, DEFAULT_SESSION_ID, update) subscribers.locked { - val toRemove = mutableListOf() - forEach { subscriber: Map.Entry -> - val unacknowledgedCount = mapVersion - subscriber.value.mapVersion - // TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst. - if (unacknowledgedCount <= maxUnacknowledgedUpdates) { - net.send(message, subscriber.key) - } else { - toRemove.add(subscriber.key) - } - } - toRemove.forEach { remove(it) } + // Remove any stale subscribers + values.removeIf { lastAckInfo -> mapVersion - lastAckInfo.mapVersion > maxUnacknowledgedUpdates } + // TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst. + keys.forEach { recipient -> net.send(message, recipient) } } } - @VisibleForTesting - fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit { - if (req.replyTo !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber() - subscribers.locked { - val lastVersionAcked = this[req.replyTo]?.mapVersion - if ((lastVersionAcked ?: 0) < req.mapVersion) { - this[req.replyTo] = LastAcknowledgeInfo(req.mapVersion) - } - } - } - - @VisibleForTesting - fun processFetchAllRequest(req: NetworkMapService.FetchMapRequest): NetworkMapService.FetchMapResponse { - if (req.subscribe) { - addSubscriber(req.replyTo) - } - val ver = mapVersion - if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) { - val nodes = ArrayList(registeredNodes.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals - return NetworkMapService.FetchMapResponse(nodes, ver) + private fun processSubscriptionRequest(request: SubscribeRequest): SubscribeResponse { + if (request.subscribe) { + addSubscriber(request.replyTo) } else { - return NetworkMapService.FetchMapResponse(null, ver) + removeSubscriber(request.replyTo) } - } - - @VisibleForTesting - fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse { - val candidate = registeredNodes[req.identity]?.reg - - // If the most recent record we have is of the node being removed from the map, then it's considered - // as no match. - if (candidate == null || candidate.type == AddOrRemove.REMOVE) { - return NetworkMapService.QueryIdentityResponse(null) - } else { - return NetworkMapService.QueryIdentityResponse(candidate.node) - } - } - - @VisibleForTesting - fun processRegistrationChangeRequest(req: NetworkMapService.RegistrationRequest): NetworkMapService.RegistrationResponse { - require(req.wireReg.raw.size < maxSizeRegistrationRequestBytes) - val change: NodeRegistration - - try { - change = req.wireReg.verified() - } catch(e: SignatureException) { - throw NodeMapError.InvalidSignature() - } - val node = change.node - - var changed: Boolean = false - // Update the current value atomically, so that if multiple updates come - // in on different threads, there is no risk of a race condition while checking - // sequence numbers. - val registrationInfo = registeredNodes.compute(node.legalIdentity, { mapKey: Party, existing: NodeRegistrationInfo? -> - changed = existing == null || existing.reg.serial < change.serial - if (changed) { - when (change.type) { - AddOrRemove.ADD -> NodeRegistrationInfo(change, mapVersionIncrementAndGet()) - AddOrRemove.REMOVE -> NodeRegistrationInfo(change, mapVersionIncrementAndGet()) - else -> throw NodeMapError.UnknownChangeType() - } - } else { - existing - } - }) - if (changed) { - notifySubscribers(req.wireReg, registrationInfo!!.mapVersion) - - // Update the local cache - // TODO: Once local messaging is fixed, this should go over the network layer as it does to other - // subscribers - when (change.type) { - AddOrRemove.ADD -> { - NetworkMapService.logger.info("Added node ${node.address} to network map") - services.networkMapCache.addNode(change.node) - } - AddOrRemove.REMOVE -> { - NetworkMapService.logger.info("Removed node ${node.address} from network map") - services.networkMapCache.removeNode(change.node) - } - } - - } - return NetworkMapService.RegistrationResponse(changed) - } - - @VisibleForTesting - fun processSubscriptionRequest(req: NetworkMapService.SubscribeRequest): NetworkMapService.SubscribeResponse { - when (req.subscribe) { - false -> removeSubscriber(req.replyTo) - true -> addSubscriber(req.replyTo) - } - return NetworkMapService.SubscribeResponse(true) + return SubscribeResponse(true) } } @@ -339,7 +306,7 @@ abstract class AbstractNetworkMapService // TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node // involves providing both node and paerty, and deregistering a node involves a request with party but no node. @CordaSerializable -class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) { +data class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) { /** * Build a node registration in wire format. */ @@ -372,9 +339,6 @@ sealed class NodeMapError : Exception() { /** Thrown if the replyTo of a subscription change message is not a single message recipient */ class InvalidSubscriber : NodeMapError() - - /** Thrown if a change arrives which is of an unknown type */ - class UnknownChangeType : NodeMapError() } @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapService.kt index df15a9b06d..ccdd5dbebf 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapService.kt @@ -23,7 +23,7 @@ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetwor val registrationInfo = blob("node_registration_info") } - override val registeredNodes: MutableMap = synchronizedMap(object : AbstractJDBCHashMap(Table, loadOnInit = true) { + override val nodeRegistrations: MutableMap = synchronizedMap(object : AbstractJDBCHashMap(Table, loadOnInit = true) { override fun keyFromRow(row: ResultRow): Party = Party(row[table.nodeParty.name], row[table.nodeParty.owningKey]) override fun valueFromRow(row: ResultRow): NodeRegistrationInfo = deserializeFromBlob(row[table.registrationInfo]) @@ -42,7 +42,7 @@ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetwor init { // Initialise the network map version with the current highest persisted version, or zero if there are no entries. - _mapVersion.set(registeredNodes.values.map { it.mapVersion }.max() ?: 0) + _mapVersion.set(nodeRegistrations.values.map { it.mapVersion }.max() ?: 0) setup() } } diff --git a/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt new file mode 100644 index 0000000000..f25193f0af --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt @@ -0,0 +1,271 @@ +package net.corda.node.services + +import com.google.common.util.concurrent.ListenableFuture +import net.corda.core.getOrThrow +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.messaging.send +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.DEFAULT_SESSION_ID +import net.corda.core.node.services.ServiceInfo +import net.corda.core.serialization.deserialize +import net.corda.flows.sendRequest +import net.corda.node.services.AbstractNetworkMapServiceTest.Changed.Added +import net.corda.node.services.AbstractNetworkMapServiceTest.Changed.Removed +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.network.AbstractNetworkMapService +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.network.NetworkMapService.* +import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC +import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC +import net.corda.node.services.network.NodeRegistration +import net.corda.node.utilities.AddOrRemove +import net.corda.node.utilities.AddOrRemove.ADD +import net.corda.node.utilities.AddOrRemove.REMOVE +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.MockNetwork.MockNode +import org.assertj.core.api.Assertions.assertThat +import org.eclipse.jetty.util.BlockingArrayQueue +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.math.BigInteger +import java.security.KeyPair +import java.time.Instant + +abstract class AbstractNetworkMapServiceTest { + lateinit var network: MockNetwork + lateinit var mapServiceNode: MockNode + lateinit var alice: MockNode + + @Before + fun setup() { + network = MockNetwork(defaultFactory = nodeFactory) + network.createTwoNodes(firstNodeName = "map service", secondNodeName = "alice").apply { + mapServiceNode = first + alice = second + } + network.runNetwork() + } + + @After + fun tearDown() { + network.stopNodes() + } + + protected abstract val nodeFactory: MockNetwork.Factory + + protected abstract val networkMapService: S + + // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. + protected abstract fun swizzle() + + @Test + fun `all nodes register themselves`() { + // setup has run the network and so we immediately expect the network map service to be correctly populated + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) + assertThat(alice.identityQuery()).isEqualTo(alice.info) + assertThat(mapServiceNode.identityQuery()).isEqualTo(mapServiceNode.info) + } + + @Test + fun `re-register the same node`() { + val response = alice.registration(ADD) + swizzle() + assertThat(response.getOrThrow().error).isNull() + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) // Confirm it's a no-op + } + + @Test + fun `re-register with smaller serial value`() { + val response = alice.registration(ADD, serial = 1) + swizzle() + assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) // Confirm it's a no-op + } + + @Test + fun `de-register node`() { + val response = alice.registration(REMOVE) + swizzle() + assertThat(response.getOrThrow().error).isNull() + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Removed(alice)) + swizzle() + assertThat(alice.identityQuery()).isNull() + assertThat(mapServiceNode.identityQuery()).isEqualTo(mapServiceNode.info) + } + + @Test + fun `de-register same node again`() { + alice.registration(REMOVE) + val response = alice.registration(REMOVE) + swizzle() + assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Removed(alice)) + } + + @Test + fun `de-register unknown node`() { + val bob = newNodeSeparateFromNetworkMap("Bob") + val response = bob.registration(REMOVE) + swizzle() + assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back + assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) + } + + @Test + fun `subscribed while new node registers`() { + val updates = alice.subscribe() + swizzle() + val bob = addNewNodeToNetworkMap("Bob") + swizzle() + val update = updates.single() + assertThat(update.mapVersion).isEqualTo(networkMapService.mapVersion) + assertThat(update.wireReg.verified().toChanged()).isEqualTo(Added(bob.info)) + } + + @Test + fun `subscribed while node de-registers`() { + val bob = addNewNodeToNetworkMap("Bob") + val updates = alice.subscribe() + bob.registration(REMOVE) + swizzle() + assertThat(updates.map { it.wireReg.verified().toChanged() }).containsOnly(Removed(bob.info)) + } + + @Test + fun unsubscribe() { + val updates = alice.subscribe() + val bob = addNewNodeToNetworkMap("Bob") + alice.unsubscribe() + addNewNodeToNetworkMap("Charlie") + swizzle() + assertThat(updates.map { it.wireReg.verified().toChanged() }).containsOnly(Added(bob.info)) + } + + @Test + fun `surpass unacknowledged update limit`() { + val subscriber = newNodeSeparateFromNetworkMap("Subscriber") + val updates = subscriber.subscribe() + val bob = addNewNodeToNetworkMap("Bob") + var serial = updates.first().wireReg.verified().serial + repeat(networkMapService.maxUnacknowledgedUpdates) { + bob.registration(ADD, serial = ++serial) + swizzle() + } + // We sent maxUnacknowledgedUpdates + 1 updates - the last one will be missed + assertThat(updates).hasSize(networkMapService.maxUnacknowledgedUpdates) + } + + @Test + fun `delay sending update ack until just before unacknowledged update limit`() { + val subscriber = newNodeSeparateFromNetworkMap("Subscriber") + val updates = subscriber.subscribe() + val bob = addNewNodeToNetworkMap("Bob") + var serial = updates.first().wireReg.verified().serial + repeat(networkMapService.maxUnacknowledgedUpdates - 1) { + bob.registration(ADD, serial = ++serial) + swizzle() + } + // Subscriber will receive maxUnacknowledgedUpdates updates before sending ack + subscriber.ackUpdate(updates.last().mapVersion) + swizzle() + bob.registration(ADD, serial = ++serial) + assertThat(updates).hasSize(networkMapService.maxUnacknowledgedUpdates + 1) + assertThat(updates.last().wireReg.verified().serial).isEqualTo(serial) + } + + private fun MockNode.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List { + val request = FetchMapRequest(subscribe, ifChangedSinceVersion, info.address) + val response = services.networkService.sendRequest(FETCH_TOPIC, request, mapServiceNode.info.address) + network.runNetwork() + return response.getOrThrow().nodes?.map { it.toChanged() } ?: emptyList() + } + + private fun NodeRegistration.toChanged(): Changed = when (type) { + ADD -> Added(node) + REMOVE -> Removed(node) + } + + private fun MockNode.identityQuery(): NodeInfo? { + val request = QueryIdentityRequest(info.legalIdentity, info.address) + val response = services.networkService.sendRequest(QUERY_TOPIC, request, mapServiceNode.info.address) + network.runNetwork() + return response.getOrThrow().node + } + + private fun MockNode.registration(addOrRemove: AddOrRemove, + serial: Long = System.currentTimeMillis()): ListenableFuture { + val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val nodeRegistration = NodeRegistration(info, serial, addOrRemove, expires) + val request = RegistrationRequest(nodeRegistration.toWire(services.legalIdentityKey.private), info.address) + val response = services.networkService.sendRequest(REGISTER_TOPIC, request, mapServiceNode.info.address) + network.runNetwork() + return response + } + + private fun MockNode.subscribe(): List { + val request = SubscribeRequest(true, info.address) + val updates = BlockingArrayQueue() + services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r -> + updates += message.data.deserialize() + } + val response = services.networkService.sendRequest(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address) + network.runNetwork() + assertThat(response.getOrThrow().confirmed).isTrue() + return updates + } + + private fun MockNode.unsubscribe() { + val request = SubscribeRequest(false, info.address) + val response = services.networkService.sendRequest(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address) + network.runNetwork() + assertThat(response.getOrThrow().confirmed).isTrue() + } + + private fun MockNode.ackUpdate(mapVersion: Int) { + val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress) + services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.info.address) + network.runNetwork() + } + + private fun addNewNodeToNetworkMap(legalName: String): MockNode { + val node = network.createNode(networkMapAddress = mapServiceNode.info.address, legalName = legalName) + network.runNetwork() + return node + } + + private fun newNodeSeparateFromNetworkMap(legalName: String): MockNode { + return network.createNode(legalName = legalName, nodeFactory = NoNMSNodeFactory) + } + + sealed class Changed(val node: NodeInfo) { + override fun equals(other: Any?): Boolean = other?.javaClass == this.javaClass && (other as Changed).node == this.node + override fun hashCode(): Int = node.hashCode() + override fun toString(): String = "${javaClass.simpleName}($node)" + + class Added(node: NodeInfo) : Changed(node) { + constructor(node: MockNode) : this(node.info) + } + class Removed(node: NodeInfo) : Changed(node) { + constructor(node: MockNode) : this(node.info) + } + } + + private object NoNMSNodeFactory : MockNetwork.Factory { + override fun create(config: NodeConfiguration, + network: MockNetwork, + networkMapAddr: SingleMessageRecipient?, + advertisedServices: Set, + id: Int, + overrideServices: Map?, + entropyRoot: BigInteger): MockNode { + return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { + override fun makeNetworkMapService() {} + } + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt index 1eb68509ff..99e7e696bb 100644 --- a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapServiceTest.kt @@ -1,221 +1,10 @@ package net.corda.node.services -import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.getOrThrow -import net.corda.core.map -import net.corda.core.messaging.send -import net.corda.core.node.services.DEFAULT_SESSION_ID -import net.corda.flows.sendRequest -import net.corda.node.services.network.AbstractNetworkMapService import net.corda.node.services.network.InMemoryNetworkMapService -import net.corda.node.services.network.NetworkMapService -import net.corda.node.services.network.NetworkMapService.* -import net.corda.node.services.network.NodeRegistration -import net.corda.node.utilities.AddOrRemove -import net.corda.node.utilities.databaseTransaction import net.corda.testing.node.MockNetwork -import net.corda.testing.node.MockNetwork.MockNode -import org.junit.Before -import org.junit.Test -import java.security.PrivateKey -import java.time.Instant -import java.util.concurrent.Future -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertNull -import kotlin.test.assertTrue -/** - * Abstracted out test logic to be re-used by [PersistentNetworkMapServiceTest]. - */ -abstract class AbstractNetworkMapServiceTest { - - protected fun success(mapServiceNode: MockNode, - registerNode: MockNode, - service: () -> AbstractNetworkMapService, - swizzle: () -> Unit) { - // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. - swizzle() - - // Confirm the service contains no nodes as own node only registered if network is run. - assertEquals(0, service().nodes.count()) - assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node) - - // Register the new node - val instant = Instant.now() - val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - val nodeKey = registerNode.services.legalIdentityKey - val addChange = NodeRegistration(registerNode.info, instant.toEpochMilli(), AddOrRemove.ADD, expires) - val addWireChange = addChange.toWire(nodeKey.private) - service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) - swizzle() - - assertEquals(1, service().nodes.count()) - assertEquals(registerNode.info, service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node) - - // Re-registering should be a no-op - service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) - swizzle() - - assertEquals(1, service().nodes.count()) - - // Confirm that de-registering the node succeeds and drops it from the node lists - val removeChange = NodeRegistration(registerNode.info, instant.toEpochMilli() + 1, AddOrRemove.REMOVE, expires) - val removeWireChange = removeChange.toWire(nodeKey.private) - assert(service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) - swizzle() - - assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node) - swizzle() - - // Trying to de-register a node that doesn't exist should fail - assert(!service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) - } - - protected fun `success with network`(network: MockNetwork, - mapServiceNode: MockNode, - registerNode: MockNode, - swizzle: () -> Unit) { - // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. - swizzle() - - // Confirm all nodes have registered themselves - network.runNetwork() - var fetchResult = registerNode.fetchMap(mapServiceNode, false) - network.runNetwork() - assertEquals(2, fetchResult.getOrThrow()?.count()) - - // Forcibly deregister the second node - val nodeKey = registerNode.services.legalIdentityKey - val instant = Instant.now() - val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - val reg = NodeRegistration(registerNode.info, instant.toEpochMilli() + 1, AddOrRemove.REMOVE, expires) - val registerResult = registerNode.registration(mapServiceNode, reg, nodeKey.private) - network.runNetwork() - assertTrue(registerResult.getOrThrow().success) - - swizzle() - - // Now only map service node should be registered - fetchResult = registerNode.fetchMap(mapServiceNode, false) - network.runNetwork() - assertEquals(mapServiceNode.info, fetchResult.getOrThrow()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single()) - } - - protected fun `subscribe with network`(network: MockNetwork, - mapServiceNode: MockNode, - registerNode: MockNode, - service: () -> AbstractNetworkMapService, - swizzle: () -> Unit) { - // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. - swizzle() - - // Test subscribing to updates - network.runNetwork() - val subscribeResult = registerNode.subscribe(mapServiceNode, true) - network.runNetwork() - subscribeResult.getOrThrow() - - swizzle() - - val startingMapVersion = service().mapVersion - - // Check the unacknowledged count is zero - assertEquals(0, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion)) - - // Fire off an update - val nodeKey = registerNode.services.legalIdentityKey - var seq = 0L - val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) - var wireReg = reg.toWire(nodeKey.private) - service().notifySubscribers(wireReg, startingMapVersion + 1) - - swizzle() - - // Check the unacknowledged count is one - assertEquals(1, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1)) - - // Send in an acknowledgment and verify the count goes down - registerNode.updateAcknowlege(mapServiceNode, startingMapVersion + 1) - network.runNetwork() - - swizzle() - - assertEquals(0, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1)) - - // Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit - // is hit. On the last iteration overflow the pending list, and check the node is unsubscribed - for (i in 0..service().maxUnacknowledgedUpdates) { - reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) - wireReg = reg.toWire(nodeKey.private) - service().notifySubscribers(wireReg, i + startingMapVersion + 2) - - swizzle() - - if (i < service().maxUnacknowledgedUpdates) { - assertEquals(i + 1, service().getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2)) - } else { - assertNull(service().getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2)) - } - } - } - - private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture { - val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress) - return services.networkService.sendRequest(NetworkMapService.REGISTER_TOPIC, req, mapServiceNode.info.address) - } - - private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture { - val req = SubscribeRequest(subscribe, services.networkService.myAddress) - return services.networkService.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, mapServiceNode.info.address) - } - - private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) { - val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress) - services.networkService.send(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address) - } - - private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future?> { - val net = services.networkService - val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress) - return net.sendRequest(NetworkMapService.FETCH_TOPIC, req, mapServiceNode.info.address).map { it.nodes } - } -} - -class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest() { - lateinit var network: MockNetwork - - @Before - fun setup() { - network = MockNetwork() - } - - /** - * Perform basic tests of registering, de-registering and fetching the full network map. - */ - @Test - fun success() { - val (mapServiceNode, registerNode) = network.createTwoNodes() - val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService - databaseTransaction(mapServiceNode.database) { - success(mapServiceNode, registerNode, { service }, { }) - } - } - - @Test - fun `success with network`() { - val (mapServiceNode, registerNode) = network.createTwoNodes() - - // Confirm there's a network map service on node 0 - assertNotNull(mapServiceNode.inNodeNetworkMapService) - `success with network`(network, mapServiceNode, registerNode, { }) - } - - @Test - fun `subscribe with network`() { - val (mapServiceNode, registerNode) = network.createTwoNodes() - val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService) - `subscribe with network`(network, mapServiceNode, registerNode, { service }, { }) - } +class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest() { + override val nodeFactory: MockNetwork.Factory get() = MockNetwork.DefaultFactory + override val networkMapService: InMemoryNetworkMapService get() = mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService + override fun swizzle() = Unit } diff --git a/node/src/test/kotlin/net/corda/node/services/PersistentNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/PersistentNetworkMapServiceTest.kt index aa3f73a90e..40b6305926 100644 --- a/node/src/test/kotlin/net/corda/node/services/PersistentNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/PersistentNetworkMapServiceTest.kt @@ -1,67 +1,43 @@ package net.corda.node.services import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.network.AbstractNetworkMapService -import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.PersistentNetworkMapService import net.corda.node.utilities.databaseTransaction import net.corda.testing.node.MockNetwork -import org.junit.After -import org.junit.Before -import org.junit.Test +import net.corda.testing.node.MockNetwork.MockNode import java.math.BigInteger import java.security.KeyPair -import java.security.KeyPairGeneratorSpi /** * This class mirrors [InMemoryNetworkMapServiceTest] but switches in a [PersistentNetworkMapService] and * repeatedly replaces it with new instances to check that the service correctly restores the most recent state. */ -class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() { - lateinit var network: MockNetwork +class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() { - @Before - fun setup() { - network = MockNetwork() - } + override val nodeFactory: MockNetwork.Factory get() = NodeFactory - @After - fun tearDown() { - network.stopNodes() - } + override val networkMapService: PersistentNetworkMapService + get() = (mapServiceNode.inNodeNetworkMapService as SwizzleNetworkMapService).delegate - /** - * We use a special [NetworkMapService] that allows us to switch in a new instance at any time to check that the - * state within it is correctly restored. - */ - private class SwizzleNetworkMapService(services: ServiceHubInternal) : NetworkMapService { - var delegate: AbstractNetworkMapService = InMemoryNetworkMapService(services) - - override val nodes: List - get() = delegate.nodes - - fun swizzle() { - delegate.unregisterNetworkHandlers() - delegate = makeNetworkMapService(delegate.services) - } - - private fun makeNetworkMapService(services: ServiceHubInternal): AbstractNetworkMapService { - return PersistentNetworkMapService(services) + override fun swizzle() { + databaseTransaction(mapServiceNode.database) { + (mapServiceNode.inNodeNetworkMapService as SwizzleNetworkMapService).swizzle() } } private object NodeFactory : MockNetwork.Factory { - override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, - advertisedServices: Set, id: Int, + override fun create(config: NodeConfiguration, + network: MockNetwork, + networkMapAddr: SingleMessageRecipient?, + advertisedServices: Set, + id: Int, overrideServices: Map?, - entropyRoot: BigInteger): MockNetwork.MockNode { - return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { - + entropyRoot: BigInteger): MockNode { + return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { override fun makeNetworkMapService() { inNodeNetworkMapService = SwizzleNetworkMapService(services) } @@ -70,42 +46,15 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() { } /** - * Perform basic tests of registering, de-registering and fetching the full network map. - * - * TODO: make the names of these and those in [AbstractNetworkMapServiceTest] and [InMemoryNetworkMapServiceTest] more - * meaningful. + * We use a special [NetworkMapService] that allows us to switch in a new instance at any time to check that the + * state within it is correctly restored. */ - @Test - fun success() { - val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory) - val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService + private class SwizzleNetworkMapService(val services: ServiceHubInternal) : NetworkMapService { + var delegate: PersistentNetworkMapService = PersistentNetworkMapService(services) - databaseTransaction(mapServiceNode.database) { - success(mapServiceNode, registerNode, { service.delegate }, { service.swizzle() }) - } - } - - @Test - fun `success with network`() { - val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory) - - // Confirm there's a network map service on node 0 - val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService - - databaseTransaction(mapServiceNode.database) { - `success with network`(network, mapServiceNode, registerNode, { service.swizzle() }) - } - } - - @Test - fun `subscribe with network`() { - val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory) - - // Confirm there's a network map service on node 0 - val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService - - databaseTransaction(mapServiceNode.database) { - `subscribe with network`(network, mapServiceNode, registerNode, { service.delegate }, { service.swizzle() }) + fun swizzle() { + delegate.unregisterNetworkHandlers() + delegate = PersistentNetworkMapService(services) } } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 1e28762ac6..4879eace01 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -304,7 +304,10 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, * Sets up a two node network, in which the first node runs network map and notary services and the other * doesn't. */ - fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair { + fun createTwoNodes(firstNodeName: String? = null, + secondNodeName: String? = null, + nodeFactory: Factory = defaultFactory, + notaryKeyPair: KeyPair? = null): Pair { require(nodes.isEmpty()) val notaryServiceInfo = ServiceInfo(SimpleNotaryService.type) val notaryOverride = if (notaryKeyPair != null) @@ -312,8 +315,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, else null return Pair( - createNode(null, -1, nodeFactory, true, null, notaryOverride, BigInteger.valueOf(random63BitValue()), ServiceInfo(NetworkMapService.type), notaryServiceInfo), - createNode(nodes[0].info.address, -1, nodeFactory, true, null) + createNode(null, -1, nodeFactory, true, firstNodeName, notaryOverride, BigInteger.valueOf(random63BitValue()), ServiceInfo(NetworkMapService.type), notaryServiceInfo), + createNode(nodes[0].info.address, -1, nodeFactory, true, secondNodeName) ) }