Clean up of network map service and its tests, and added error msg to registration response

This commit is contained in:
Shams Asari 2017-03-13 11:46:16 +00:00
parent 391270ed71
commit 5e3e7f6c1c
7 changed files with 440 additions and 465 deletions

View File

@ -381,15 +381,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun registerWithNetworkMapIfConfigured(): ListenableFuture<Unit> {
require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) {
"Initial network map address must indicate a node that provides a network map service"
}
services.networkMapCache.addNode(info)
// In the unit test environment, we may run without any network map service sometimes.
return if (networkMapAddress == null && inNodeNetworkMapService == null) {
services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else {
registerWithNetworkMap()
}
@ -400,11 +396,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
* updates) if one has been supplied.
*/
protected open fun registerWithNetworkMap(): ListenableFuture<Unit> {
require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) {
"Initial network map address must indicate a node that provides a network map service"
}
val address = networkMapAddress ?: info.address
// Register for updates, even if we're the one running the network map.
return sendNetworkMapRegistration(address).flatMap { response ->
check(response.success) { "The network map service rejected our registration request" }
// This Future will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
check(response.error == null) { "Unable to register with the network map service: ${response.error}" }
// The future returned addMapService will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
services.networkMapCache.addMapService(net, address, true, null)
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import kotlinx.support.jdk8.collections.compute
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
@ -24,7 +25,16 @@ import net.corda.core.utilities.loggerFor
import net.corda.flows.ServiceRequestMessage
import net.corda.node.services.api.AbstractNodeService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AddOrRemove.REMOVE
import java.security.PrivateKey
import java.security.SignatureException
import java.time.Instant
@ -62,38 +72,36 @@ interface NetworkMapService {
// Base topic for messages acknowledging pushed updates
val PUSH_ACK_TOPIC = "platform.network_map.push_ack"
val logger = loggerFor<NetworkMapService>()
val type = ServiceType.corda.getSubType("network_map")
}
val nodes: List<NodeInfo>
class FetchMapRequest(val subscribe: Boolean,
val ifChangedSinceVersion: Int?,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
data class FetchMapRequest(val subscribe: Boolean,
val ifChangedSinceVersion: Int?,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
data class FetchMapResponse(val nodes: List<NodeRegistration>?, val version: Int)
class QueryIdentityRequest(val identity: Party,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long) : ServiceRequestMessage
data class QueryIdentityRequest(val identity: Party,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class QueryIdentityResponse(val node: NodeInfo?)
class RegistrationRequest(val wireReg: WireNodeRegistration,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
// TODO Rename this RegistractionChangeRequest or similar (and related classes)
data class RegistrationRequest(val wireReg: WireNodeRegistration,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
/** If [error] is null then the registration was successful. If not null then it wasn't and it explains why */
@CordaSerializable
data class RegistrationResponse(val success: Boolean)
data class RegistrationResponse(val error: String?)
class SubscribeRequest(val subscribe: Boolean,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
data class SubscribeRequest(val subscribe: Boolean,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
@CordaSerializable
data class SubscribeResponse(val confirmed: Boolean)
@ -107,7 +115,7 @@ interface NetworkMapService {
@ThreadSafe
class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) {
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
override val nodeRegistrations: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
init {
@ -122,9 +130,17 @@ class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkM
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
*/
@ThreadSafe
abstract class AbstractNetworkMapService
(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) {
protected abstract val registeredNodes: MutableMap<Party, NodeRegistrationInfo>
abstract class AbstractNetworkMapService(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) {
companion object {
/**
* Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a
* 10 times overhead.
*/
private const val MAX_SIZE_REGISTRATION_REQUEST_BYTES = 5500
private val logger = loggerFor<AbstractNetworkMapService>()
}
protected abstract val nodeRegistrations: MutableMap<Party, NodeRegistrationInfo>
// Map from subscriber address, to most recently acknowledged update map version.
protected abstract val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
@ -135,38 +151,19 @@ abstract class AbstractNetworkMapService
val mapVersion: Int
get() = _mapVersion.get()
private fun mapVersionIncrementAndGet(): Int = _mapVersion.incrementAndGet()
/** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */
val maxUnacknowledgedUpdates = 10
/**
* Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a
* 10 times overhead.
*/
val maxSizeRegistrationRequestBytes = 5500
private val handlers = ArrayList<MessageHandlerRegistration>()
// Filter reduces this to the entries that add a node to the map
override val nodes: List<NodeInfo>
get() = registeredNodes.mapNotNull { if (it.value.reg.type == AddOrRemove.ADD) it.value.reg.node else null }
protected fun setup() {
// Register message handlers
handlers += addMessageHandler(NetworkMapService.FETCH_TOPIC,
{ req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.QUERY_TOPIC,
{ req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.REGISTER_TOPIC,
{ req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_TOPIC,
{ req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) }
)
handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r ->
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
handlers += addMessageHandler(FETCH_TOPIC) { req: FetchMapRequest -> processFetchAllRequest(req) }
handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) }
handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) }
handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) }
handlers += net.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r ->
val req = message.data.deserialize<UpdateAcknowledge>()
processAcknowledge(req)
}
}
@ -193,135 +190,105 @@ abstract class AbstractNetworkMapService
subscribers.locked { remove(subscriber) }
}
@VisibleForTesting
fun getUnacknowledgedCount(subscriber: SingleMessageRecipient, mapVersion: Int): Int? {
return subscribers.locked {
val subscriberMapVersion = get(subscriber)?.mapVersion
if (subscriberMapVersion != null) {
mapVersion - subscriberMapVersion
} else {
null
private fun processAcknowledge(request: UpdateAcknowledge): Unit {
if (request.replyTo !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked {
val lastVersionAcked = this[request.replyTo]?.mapVersion
if ((lastVersionAcked ?: 0) < request.mapVersion) {
this[request.replyTo] = LastAcknowledgeInfo(request.mapVersion)
}
}
}
@VisibleForTesting
fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) {
private fun processFetchAllRequest(request: FetchMapRequest): FetchMapResponse {
if (request.subscribe) {
addSubscriber(request.replyTo)
}
val currentVersion = mapVersion
val nodeRegistrations = if (request.ifChangedSinceVersion == null || request.ifChangedSinceVersion < currentVersion) {
// We return back the current state of the entire map including nodes that have been removed
ArrayList(nodeRegistrations.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals
} else {
null
}
return FetchMapResponse(nodeRegistrations, currentVersion)
}
private fun processQueryRequest(request: QueryIdentityRequest): QueryIdentityResponse {
val candidate = nodeRegistrations[request.identity]?.reg
// If the most recent record we have is of the node being removed from the map, then it's considered
// as no match.
val node = if (candidate == null || candidate.type == REMOVE) null else candidate.node
return QueryIdentityResponse(node)
}
private fun processRegistrationRequest(request: RegistrationRequest): RegistrationResponse {
if (request.wireReg.raw.size > MAX_SIZE_REGISTRATION_REQUEST_BYTES) return RegistrationResponse("Request is too big")
val change = try {
request.wireReg.verified()
} catch(e: SignatureException) {
return RegistrationResponse("Invalid signature on request")
}
val node = change.node
// Update the current value atomically, so that if multiple updates come
// in on different threads, there is no risk of a race condition while checking
// sequence numbers.
val registrationInfo = try {
nodeRegistrations.compute(node.legalIdentity) { mapKey: Party, existing: NodeRegistrationInfo? ->
require(!((existing == null || existing.reg.type == REMOVE) && change.type == REMOVE)) {
"Attempting to de-register unknown node"
}
require(existing == null || existing.reg.serial < change.serial) { "Serial value is too small" }
NodeRegistrationInfo(change, _mapVersion.incrementAndGet())
}
} catch (e: IllegalArgumentException) {
return RegistrationResponse(e.message)
}
notifySubscribers(request.wireReg, registrationInfo!!.mapVersion)
// Update the local cache
// TODO: Once local messaging is fixed, this should go over the network layer as it does to other
// subscribers
when (change.type) {
ADD -> {
logger.info("Added node ${node.address} to network map")
services.networkMapCache.addNode(change.node)
}
REMOVE -> {
logger.info("Removed node ${node.address} from network map")
services.networkMapCache.removeNode(change.node)
}
}
return RegistrationResponse(null)
}
private fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) {
// TODO: Once we have a better established messaging system, we can probably send
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes
val message = net.createMessage(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID, update)
val message = net.createMessage(PUSH_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>()
forEach { subscriber: Map.Entry<SingleMessageRecipient, LastAcknowledgeInfo> ->
val unacknowledgedCount = mapVersion - subscriber.value.mapVersion
// TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst.
if (unacknowledgedCount <= maxUnacknowledgedUpdates) {
net.send(message, subscriber.key)
} else {
toRemove.add(subscriber.key)
}
}
toRemove.forEach { remove(it) }
// Remove any stale subscribers
values.removeIf { lastAckInfo -> mapVersion - lastAckInfo.mapVersion > maxUnacknowledgedUpdates }
// TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst.
keys.forEach { recipient -> net.send(message, recipient) }
}
}
@VisibleForTesting
fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit {
if (req.replyTo !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked {
val lastVersionAcked = this[req.replyTo]?.mapVersion
if ((lastVersionAcked ?: 0) < req.mapVersion) {
this[req.replyTo] = LastAcknowledgeInfo(req.mapVersion)
}
}
}
@VisibleForTesting
fun processFetchAllRequest(req: NetworkMapService.FetchMapRequest): NetworkMapService.FetchMapResponse {
if (req.subscribe) {
addSubscriber(req.replyTo)
}
val ver = mapVersion
if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) {
val nodes = ArrayList(registeredNodes.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals
return NetworkMapService.FetchMapResponse(nodes, ver)
private fun processSubscriptionRequest(request: SubscribeRequest): SubscribeResponse {
if (request.subscribe) {
addSubscriber(request.replyTo)
} else {
return NetworkMapService.FetchMapResponse(null, ver)
removeSubscriber(request.replyTo)
}
}
@VisibleForTesting
fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse {
val candidate = registeredNodes[req.identity]?.reg
// If the most recent record we have is of the node being removed from the map, then it's considered
// as no match.
if (candidate == null || candidate.type == AddOrRemove.REMOVE) {
return NetworkMapService.QueryIdentityResponse(null)
} else {
return NetworkMapService.QueryIdentityResponse(candidate.node)
}
}
@VisibleForTesting
fun processRegistrationChangeRequest(req: NetworkMapService.RegistrationRequest): NetworkMapService.RegistrationResponse {
require(req.wireReg.raw.size < maxSizeRegistrationRequestBytes)
val change: NodeRegistration
try {
change = req.wireReg.verified()
} catch(e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
val node = change.node
var changed: Boolean = false
// Update the current value atomically, so that if multiple updates come
// in on different threads, there is no risk of a race condition while checking
// sequence numbers.
val registrationInfo = registeredNodes.compute(node.legalIdentity, { mapKey: Party, existing: NodeRegistrationInfo? ->
changed = existing == null || existing.reg.serial < change.serial
if (changed) {
when (change.type) {
AddOrRemove.ADD -> NodeRegistrationInfo(change, mapVersionIncrementAndGet())
AddOrRemove.REMOVE -> NodeRegistrationInfo(change, mapVersionIncrementAndGet())
else -> throw NodeMapError.UnknownChangeType()
}
} else {
existing
}
})
if (changed) {
notifySubscribers(req.wireReg, registrationInfo!!.mapVersion)
// Update the local cache
// TODO: Once local messaging is fixed, this should go over the network layer as it does to other
// subscribers
when (change.type) {
AddOrRemove.ADD -> {
NetworkMapService.logger.info("Added node ${node.address} to network map")
services.networkMapCache.addNode(change.node)
}
AddOrRemove.REMOVE -> {
NetworkMapService.logger.info("Removed node ${node.address} from network map")
services.networkMapCache.removeNode(change.node)
}
}
}
return NetworkMapService.RegistrationResponse(changed)
}
@VisibleForTesting
fun processSubscriptionRequest(req: NetworkMapService.SubscribeRequest): NetworkMapService.SubscribeResponse {
when (req.subscribe) {
false -> removeSubscriber(req.replyTo)
true -> addSubscriber(req.replyTo)
}
return NetworkMapService.SubscribeResponse(true)
return SubscribeResponse(true)
}
}
@ -339,7 +306,7 @@ abstract class AbstractNetworkMapService
// TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node
// involves providing both node and paerty, and deregistering a node involves a request with party but no node.
@CordaSerializable
class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
data class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
/**
* Build a node registration in wire format.
*/
@ -372,9 +339,6 @@ sealed class NodeMapError : Exception() {
/** Thrown if the replyTo of a subscription change message is not a single message recipient */
class InvalidSubscriber : NodeMapError()
/** Thrown if a change arrives which is of an unknown type */
class UnknownChangeType : NodeMapError()
}
@CordaSerializable

View File

@ -23,7 +23,7 @@ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetwor
val registrationInfo = blob("node_registration_info")
}
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = synchronizedMap(object : AbstractJDBCHashMap<Party, NodeRegistrationInfo, Table>(Table, loadOnInit = true) {
override val nodeRegistrations: MutableMap<Party, NodeRegistrationInfo> = synchronizedMap(object : AbstractJDBCHashMap<Party, NodeRegistrationInfo, Table>(Table, loadOnInit = true) {
override fun keyFromRow(row: ResultRow): Party = Party(row[table.nodeParty.name], row[table.nodeParty.owningKey])
override fun valueFromRow(row: ResultRow): NodeRegistrationInfo = deserializeFromBlob(row[table.registrationInfo])
@ -42,7 +42,7 @@ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetwor
init {
// Initialise the network map version with the current highest persisted version, or zero if there are no entries.
_mapVersion.set(registeredNodes.values.map { it.mapVersion }.max() ?: 0)
_mapVersion.set(nodeRegistrations.values.map { it.mapVersion }.max() ?: 0)
setup()
}
}

View File

@ -0,0 +1,271 @@
package net.corda.node.services
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.getOrThrow
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.send
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.deserialize
import net.corda.flows.sendRequest
import net.corda.node.services.AbstractNetworkMapServiceTest.Changed.Added
import net.corda.node.services.AbstractNetworkMapServiceTest.Changed.Removed
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.AbstractNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AddOrRemove.REMOVE
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.assertj.core.api.Assertions.assertThat
import org.eclipse.jetty.util.BlockingArrayQueue
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.math.BigInteger
import java.security.KeyPair
import java.time.Instant
abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService> {
lateinit var network: MockNetwork
lateinit var mapServiceNode: MockNode
lateinit var alice: MockNode
@Before
fun setup() {
network = MockNetwork(defaultFactory = nodeFactory)
network.createTwoNodes(firstNodeName = "map service", secondNodeName = "alice").apply {
mapServiceNode = first
alice = second
}
network.runNetwork()
}
@After
fun tearDown() {
network.stopNodes()
}
protected abstract val nodeFactory: MockNetwork.Factory
protected abstract val networkMapService: S
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
protected abstract fun swizzle()
@Test
fun `all nodes register themselves`() {
// setup has run the network and so we immediately expect the network map service to be correctly populated
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice))
assertThat(alice.identityQuery()).isEqualTo(alice.info)
assertThat(mapServiceNode.identityQuery()).isEqualTo(mapServiceNode.info)
}
@Test
fun `re-register the same node`() {
val response = alice.registration(ADD)
swizzle()
assertThat(response.getOrThrow().error).isNull()
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) // Confirm it's a no-op
}
@Test
fun `re-register with smaller serial value`() {
val response = alice.registration(ADD, serial = 1)
swizzle()
assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice)) // Confirm it's a no-op
}
@Test
fun `de-register node`() {
val response = alice.registration(REMOVE)
swizzle()
assertThat(response.getOrThrow().error).isNull()
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Removed(alice))
swizzle()
assertThat(alice.identityQuery()).isNull()
assertThat(mapServiceNode.identityQuery()).isEqualTo(mapServiceNode.info)
}
@Test
fun `de-register same node again`() {
alice.registration(REMOVE)
val response = alice.registration(REMOVE)
swizzle()
assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Removed(alice))
}
@Test
fun `de-register unknown node`() {
val bob = newNodeSeparateFromNetworkMap("Bob")
val response = bob.registration(REMOVE)
swizzle()
assertThat(response.getOrThrow().error).isNotNull() // Make sure send error message is sent back
assertThat(alice.fetchMap()).containsOnly(Added(mapServiceNode), Added(alice))
}
@Test
fun `subscribed while new node registers`() {
val updates = alice.subscribe()
swizzle()
val bob = addNewNodeToNetworkMap("Bob")
swizzle()
val update = updates.single()
assertThat(update.mapVersion).isEqualTo(networkMapService.mapVersion)
assertThat(update.wireReg.verified().toChanged()).isEqualTo(Added(bob.info))
}
@Test
fun `subscribed while node de-registers`() {
val bob = addNewNodeToNetworkMap("Bob")
val updates = alice.subscribe()
bob.registration(REMOVE)
swizzle()
assertThat(updates.map { it.wireReg.verified().toChanged() }).containsOnly(Removed(bob.info))
}
@Test
fun unsubscribe() {
val updates = alice.subscribe()
val bob = addNewNodeToNetworkMap("Bob")
alice.unsubscribe()
addNewNodeToNetworkMap("Charlie")
swizzle()
assertThat(updates.map { it.wireReg.verified().toChanged() }).containsOnly(Added(bob.info))
}
@Test
fun `surpass unacknowledged update limit`() {
val subscriber = newNodeSeparateFromNetworkMap("Subscriber")
val updates = subscriber.subscribe()
val bob = addNewNodeToNetworkMap("Bob")
var serial = updates.first().wireReg.verified().serial
repeat(networkMapService.maxUnacknowledgedUpdates) {
bob.registration(ADD, serial = ++serial)
swizzle()
}
// We sent maxUnacknowledgedUpdates + 1 updates - the last one will be missed
assertThat(updates).hasSize(networkMapService.maxUnacknowledgedUpdates)
}
@Test
fun `delay sending update ack until just before unacknowledged update limit`() {
val subscriber = newNodeSeparateFromNetworkMap("Subscriber")
val updates = subscriber.subscribe()
val bob = addNewNodeToNetworkMap("Bob")
var serial = updates.first().wireReg.verified().serial
repeat(networkMapService.maxUnacknowledgedUpdates - 1) {
bob.registration(ADD, serial = ++serial)
swizzle()
}
// Subscriber will receive maxUnacknowledgedUpdates updates before sending ack
subscriber.ackUpdate(updates.last().mapVersion)
swizzle()
bob.registration(ADD, serial = ++serial)
assertThat(updates).hasSize(networkMapService.maxUnacknowledgedUpdates + 1)
assertThat(updates.last().wireReg.verified().serial).isEqualTo(serial)
}
private fun MockNode.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List<Changed> {
val request = FetchMapRequest(subscribe, ifChangedSinceVersion, info.address)
val response = services.networkService.sendRequest<FetchMapResponse>(FETCH_TOPIC, request, mapServiceNode.info.address)
network.runNetwork()
return response.getOrThrow().nodes?.map { it.toChanged() } ?: emptyList()
}
private fun NodeRegistration.toChanged(): Changed = when (type) {
ADD -> Added(node)
REMOVE -> Removed(node)
}
private fun MockNode.identityQuery(): NodeInfo? {
val request = QueryIdentityRequest(info.legalIdentity, info.address)
val response = services.networkService.sendRequest<QueryIdentityResponse>(QUERY_TOPIC, request, mapServiceNode.info.address)
network.runNetwork()
return response.getOrThrow().node
}
private fun MockNode.registration(addOrRemove: AddOrRemove,
serial: Long = System.currentTimeMillis()): ListenableFuture<RegistrationResponse> {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val nodeRegistration = NodeRegistration(info, serial, addOrRemove, expires)
val request = RegistrationRequest(nodeRegistration.toWire(services.legalIdentityKey.private), info.address)
val response = services.networkService.sendRequest<RegistrationResponse>(REGISTER_TOPIC, request, mapServiceNode.info.address)
network.runNetwork()
return response
}
private fun MockNode.subscribe(): List<Update> {
val request = SubscribeRequest(true, info.address)
val updates = BlockingArrayQueue<Update>()
services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r ->
updates += message.data.deserialize<Update>()
}
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address)
network.runNetwork()
assertThat(response.getOrThrow().confirmed).isTrue()
return updates
}
private fun MockNode.unsubscribe() {
val request = SubscribeRequest(false, info.address)
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address)
network.runNetwork()
assertThat(response.getOrThrow().confirmed).isTrue()
}
private fun MockNode.ackUpdate(mapVersion: Int) {
val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.info.address)
network.runNetwork()
}
private fun addNewNodeToNetworkMap(legalName: String): MockNode {
val node = network.createNode(networkMapAddress = mapServiceNode.info.address, legalName = legalName)
network.runNetwork()
return node
}
private fun newNodeSeparateFromNetworkMap(legalName: String): MockNode {
return network.createNode(legalName = legalName, nodeFactory = NoNMSNodeFactory)
}
sealed class Changed(val node: NodeInfo) {
override fun equals(other: Any?): Boolean = other?.javaClass == this.javaClass && (other as Changed).node == this.node
override fun hashCode(): Int = node.hashCode()
override fun toString(): String = "${javaClass.simpleName}($node)"
class Added(node: NodeInfo) : Changed(node) {
constructor(node: MockNode) : this(node.info)
}
class Removed(node: NodeInfo) : Changed(node) {
constructor(node: MockNode) : this(node.info)
}
}
private object NoNMSNodeFactory : MockNetwork.Factory {
override fun create(config: NodeConfiguration,
network: MockNetwork,
networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>,
id: Int,
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNode {
return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun makeNetworkMapService() {}
}
}
}
}

View File

@ -1,221 +1,10 @@
package net.corda.node.services
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.getOrThrow
import net.corda.core.map
import net.corda.core.messaging.send
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.flows.sendRequest
import net.corda.node.services.network.AbstractNetworkMapService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.junit.Before
import org.junit.Test
import java.security.PrivateKey
import java.time.Instant
import java.util.concurrent.Future
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
/**
* Abstracted out test logic to be re-used by [PersistentNetworkMapServiceTest].
*/
abstract class AbstractNetworkMapServiceTest {
protected fun success(mapServiceNode: MockNode,
registerNode: MockNode,
service: () -> AbstractNetworkMapService,
swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
swizzle()
// Confirm the service contains no nodes as own node only registered if network is run.
assertEquals(0, service().nodes.count())
assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Register the new node
val instant = Instant.now()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val nodeKey = registerNode.services.legalIdentityKey
val addChange = NodeRegistration(registerNode.info, instant.toEpochMilli(), AddOrRemove.ADD, expires)
val addWireChange = addChange.toWire(nodeKey.private)
service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
swizzle()
assertEquals(1, service().nodes.count())
assertEquals(registerNode.info, service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Re-registering should be a no-op
service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
swizzle()
assertEquals(1, service().nodes.count())
// Confirm that de-registering the node succeeds and drops it from the node lists
val removeChange = NodeRegistration(registerNode.info, instant.toEpochMilli() + 1, AddOrRemove.REMOVE, expires)
val removeWireChange = removeChange.toWire(nodeKey.private)
assert(service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
swizzle()
assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.legalIdentity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
swizzle()
// Trying to de-register a node that doesn't exist should fail
assert(!service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
}
protected fun `success with network`(network: MockNetwork,
mapServiceNode: MockNode,
registerNode: MockNode,
swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
swizzle()
// Confirm all nodes have registered themselves
network.runNetwork()
var fetchResult = registerNode.fetchMap(mapServiceNode, false)
network.runNetwork()
assertEquals(2, fetchResult.getOrThrow()?.count())
// Forcibly deregister the second node
val nodeKey = registerNode.services.legalIdentityKey
val instant = Instant.now()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(registerNode.info, instant.toEpochMilli() + 1, AddOrRemove.REMOVE, expires)
val registerResult = registerNode.registration(mapServiceNode, reg, nodeKey.private)
network.runNetwork()
assertTrue(registerResult.getOrThrow().success)
swizzle()
// Now only map service node should be registered
fetchResult = registerNode.fetchMap(mapServiceNode, false)
network.runNetwork()
assertEquals(mapServiceNode.info, fetchResult.getOrThrow()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
}
protected fun `subscribe with network`(network: MockNetwork,
mapServiceNode: MockNode,
registerNode: MockNode,
service: () -> AbstractNetworkMapService,
swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
swizzle()
// Test subscribing to updates
network.runNetwork()
val subscribeResult = registerNode.subscribe(mapServiceNode, true)
network.runNetwork()
subscribeResult.getOrThrow()
swizzle()
val startingMapVersion = service().mapVersion
// Check the unacknowledged count is zero
assertEquals(0, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion))
// Fire off an update
val nodeKey = registerNode.services.legalIdentityKey
var seq = 0L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
var wireReg = reg.toWire(nodeKey.private)
service().notifySubscribers(wireReg, startingMapVersion + 1)
swizzle()
// Check the unacknowledged count is one
assertEquals(1, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
// Send in an acknowledgment and verify the count goes down
registerNode.updateAcknowlege(mapServiceNode, startingMapVersion + 1)
network.runNetwork()
swizzle()
assertEquals(0, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
// Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit
// is hit. On the last iteration overflow the pending list, and check the node is unsubscribed
for (i in 0..service().maxUnacknowledgedUpdates) {
reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
wireReg = reg.toWire(nodeKey.private)
service().notifySubscribers(wireReg, i + startingMapVersion + 2)
swizzle()
if (i < service().maxUnacknowledgedUpdates) {
assertEquals(i + 1, service().getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2))
} else {
assertNull(service().getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2))
}
}
}
private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<RegistrationResponse> {
val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress)
return services.networkService.sendRequest(NetworkMapService.REGISTER_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture<SubscribeResponse> {
val req = SubscribeRequest(subscribe, services.networkService.myAddress)
return services.networkService.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) {
val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
}
private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
val net = services.networkService
val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress)
return net.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
}
}
class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
lateinit var network: MockNetwork
@Before
fun setup() {
network = MockNetwork()
}
/**
* Perform basic tests of registering, de-registering and fetching the full network map.
*/
@Test
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService
databaseTransaction(mapServiceNode.database) {
success(mapServiceNode, registerNode, { service }, { })
}
}
@Test
fun `success with network`() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
// Confirm there's a network map service on node 0
assertNotNull(mapServiceNode.inNodeNetworkMapService)
`success with network`(network, mapServiceNode, registerNode, { })
}
@Test
fun `subscribe with network`() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService)
`subscribe with network`(network, mapServiceNode, registerNode, { service }, { })
}
class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest<InMemoryNetworkMapService>() {
override val nodeFactory: MockNetwork.Factory get() = MockNetwork.DefaultFactory
override val networkMapService: InMemoryNetworkMapService get() = mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService
override fun swizzle() = Unit
}

View File

@ -1,67 +1,43 @@
package net.corda.node.services
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.AbstractNetworkMapService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Before
import org.junit.Test
import net.corda.testing.node.MockNetwork.MockNode
import java.math.BigInteger
import java.security.KeyPair
import java.security.KeyPairGeneratorSpi
/**
* This class mirrors [InMemoryNetworkMapServiceTest] but switches in a [PersistentNetworkMapService] and
* repeatedly replaces it with new instances to check that the service correctly restores the most recent state.
*/
class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
lateinit var network: MockNetwork
class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest<PersistentNetworkMapService>() {
@Before
fun setup() {
network = MockNetwork()
}
override val nodeFactory: MockNetwork.Factory get() = NodeFactory
@After
fun tearDown() {
network.stopNodes()
}
override val networkMapService: PersistentNetworkMapService
get() = (mapServiceNode.inNodeNetworkMapService as SwizzleNetworkMapService).delegate
/**
* We use a special [NetworkMapService] that allows us to switch in a new instance at any time to check that the
* state within it is correctly restored.
*/
private class SwizzleNetworkMapService(services: ServiceHubInternal) : NetworkMapService {
var delegate: AbstractNetworkMapService = InMemoryNetworkMapService(services)
override val nodes: List<NodeInfo>
get() = delegate.nodes
fun swizzle() {
delegate.unregisterNetworkHandlers()
delegate = makeNetworkMapService(delegate.services)
}
private fun makeNetworkMapService(services: ServiceHubInternal): AbstractNetworkMapService {
return PersistentNetworkMapService(services)
override fun swizzle() {
databaseTransaction(mapServiceNode.database) {
(mapServiceNode.inNodeNetworkMapService as SwizzleNetworkMapService).swizzle()
}
}
private object NodeFactory : MockNetwork.Factory {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int,
override fun create(config: NodeConfiguration,
network: MockNetwork,
networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>,
id: Int,
overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
entropyRoot: BigInteger): MockNode {
return object : MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun makeNetworkMapService() {
inNodeNetworkMapService = SwizzleNetworkMapService(services)
}
@ -70,42 +46,15 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
}
/**
* Perform basic tests of registering, de-registering and fetching the full network map.
*
* TODO: make the names of these and those in [AbstractNetworkMapServiceTest] and [InMemoryNetworkMapServiceTest] more
* meaningful.
* We use a special [NetworkMapService] that allows us to switch in a new instance at any time to check that the
* state within it is correctly restored.
*/
@Test
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory)
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
private class SwizzleNetworkMapService(val services: ServiceHubInternal) : NetworkMapService {
var delegate: PersistentNetworkMapService = PersistentNetworkMapService(services)
databaseTransaction(mapServiceNode.database) {
success(mapServiceNode, registerNode, { service.delegate }, { service.swizzle() })
}
}
@Test
fun `success with network`() {
val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory)
// Confirm there's a network map service on node 0
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
databaseTransaction(mapServiceNode.database) {
`success with network`(network, mapServiceNode, registerNode, { service.swizzle() })
}
}
@Test
fun `subscribe with network`() {
val (mapServiceNode, registerNode) = network.createTwoNodes(NodeFactory)
// Confirm there's a network map service on node 0
val service = mapServiceNode.inNodeNetworkMapService!! as SwizzleNetworkMapService
databaseTransaction(mapServiceNode.database) {
`subscribe with network`(network, mapServiceNode, registerNode, { service.delegate }, { service.swizzle() })
fun swizzle() {
delegate.unregisterNetworkHandlers()
delegate = PersistentNetworkMapService(services)
}
}
}

View File

@ -304,7 +304,10 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
* Sets up a two node network, in which the first node runs network map and notary services and the other
* doesn't.
*/
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
fun createTwoNodes(firstNodeName: String? = null,
secondNodeName: String? = null,
nodeFactory: Factory = defaultFactory,
notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
require(nodes.isEmpty())
val notaryServiceInfo = ServiceInfo(SimpleNotaryService.type)
val notaryOverride = if (notaryKeyPair != null)
@ -312,8 +315,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
else
null
return Pair(
createNode(null, -1, nodeFactory, true, null, notaryOverride, BigInteger.valueOf(random63BitValue()), ServiceInfo(NetworkMapService.type), notaryServiceInfo),
createNode(nodes[0].info.address, -1, nodeFactory, true, null)
createNode(null, -1, nodeFactory, true, firstNodeName, notaryOverride, BigInteger.valueOf(random63BitValue()), ServiceInfo(NetworkMapService.type), notaryServiceInfo),
createNode(nodes[0].info.address, -1, nodeFactory, true, secondNodeName)
)
}