Merged in cor-344-network-map-persistence-prep (pull request #335)

Refactor network map service in preparation for persistence.
This commit is contained in:
Rick Parker 2016-09-06 18:22:13 +01:00
commit c6fd467fb5
4 changed files with 97 additions and 53 deletions

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RunOnCallerThread import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.CityDatabase
@ -21,6 +20,7 @@ import com.r3corda.core.seconds
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.api.APIServer import com.r3corda.node.api.APIServer
import com.r3corda.node.services.api.* import com.r3corda.node.services.api.*
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.contracts.Contract import com.r3corda.core.contracts.Contract
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send import com.r3corda.core.messaging.send
@ -65,9 +64,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r -> net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, null) { message, r ->
try { try {
val req = message.data.deserialize<NetworkMapService.Update>() val req = message.data.deserialize<NetworkMapService.Update>()
val hash = SecureHash.sha256(req.wireReg.serialize().bits)
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID,
NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits) NetworkMapService.UpdateAcknowledge(req.mapVersion, net.myAddress).serialize().bits)
net.send(ackMessage, req.replyTo) net.send(ackMessage, req.replyTo)
processUpdatePush(req) processUpdatePush(req)
} catch(e: NodeMapError) { } catch(e: NodeMapError) {

View File

@ -1,8 +1,11 @@
package com.r3corda.node.services.network package com.r3corda.node.services.network
import co.paralleluniverse.common.util.VisibleForTesting import com.google.common.annotations.VisibleForTesting
import com.r3corda.core.ThreadBox import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.* import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SignedData
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
@ -16,6 +19,7 @@ import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.ServiceRequestMessage import com.r3corda.protocols.ServiceRequestMessage
import kotlinx.support.jdk8.collections.compute
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.security.PrivateKey import java.security.PrivateKey
import java.security.SignatureException import java.security.SignatureException
@ -73,17 +77,43 @@ interface NetworkMapService {
data class RegistrationResponse(val success: Boolean) data class RegistrationResponse(val success: Boolean)
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo) class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
data class SubscribeResponse(val confirmed: Boolean) data class SubscribeResponse(val confirmed: Boolean)
data class Update(val wireReg: WireNodeRegistration, val replyTo: MessageRecipients) data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
data class UpdateAcknowledge(val wireRegHash: SecureHash, val replyTo: MessageRecipients) data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
} }
@ThreadSafe @ThreadSafe
class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net, cache) { class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, cache: NetworkMapCache) :
private val registeredNodes = ConcurrentHashMap<Party, NodeRegistration>() AbstractNetworkMapService(net, cache) {
// Map from subscriber address, to a list of unacknowledged updates
private val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, MutableList<SecureHash>>()) override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
private val mapVersion = AtomicInteger(1) override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
init {
setup(home)
}
}
/**
* Abstracted out core functionality as the basis for a persistent implementation, as well as existing in-memory implementation.
*
* Design is slightly refactored to track time and map version of last acknowledge per subscriber to facilitate
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
*/
@ThreadSafe
abstract class AbstractNetworkMapService(net: MessagingService, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net, cache) {
protected abstract val registeredNodes: MutableMap<Party, NodeRegistrationInfo>
// Map from subscriber address, to most recently acknowledged update map version.
protected abstract val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
protected val _mapVersion = AtomicInteger(0)
@VisibleForTesting
val mapVersion: Int
get() = _mapVersion.get()
private fun mapVersionIncrementAndGet(): Int = _mapVersion.incrementAndGet()
/** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */ /** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */
val maxUnacknowledgedUpdates = 10 val maxUnacknowledgedUpdates = 10
/** /**
@ -94,12 +124,13 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
// Filter reduces this to the entries that add a node to the map // Filter reduces this to the entries that add a node to the map
override val nodes: List<NodeInfo> override val nodes: List<NodeInfo>
get() = registeredNodes.mapNotNull { if (it.value.type == AddOrRemove.ADD) it.value.node else null } get() = registeredNodes.mapNotNull { if (it.value.reg.type == AddOrRemove.ADD) it.value.reg.node else null }
init { protected fun setup(home: NodeRegistration) {
// Register the local node with the service // Register the local node with the service
val homeIdentity = home.node.identity val homeIdentity = home.node.identity
registeredNodes[homeIdentity] = home val registrationInfo = NodeRegistrationInfo(home, mapVersionIncrementAndGet())
registeredNodes[homeIdentity] = registrationInfo
// Register message handlers // Register message handlers
addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC, addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC,
@ -118,13 +149,15 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>() val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
processAcknowledge(req) processAcknowledge(req)
} }
// TODO: notify subscribers of name service registration. Network service is not up, so how?
} }
private fun addSubscriber(subscriber: MessageRecipients) { private fun addSubscriber(subscriber: MessageRecipients) {
if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber() if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked { subscribers.locked {
if (!containsKey(subscriber)) { if (!containsKey(subscriber)) {
put(subscriber, mutableListOf<SecureHash>()) put(subscriber, LastAcknowledgeInfo(mapVersion))
} }
} }
} }
@ -135,24 +168,31 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
} }
@VisibleForTesting @VisibleForTesting
fun getUnacknowledgedCount(subscriber: SingleMessageRecipient): Int? fun getUnacknowledgedCount(subscriber: SingleMessageRecipient, mapVersion: Int): Int? {
= subscribers.locked { get(subscriber)?.count() } return subscribers.locked {
val subscriberMapVersion = get(subscriber)?.mapVersion
if (subscriberMapVersion != null) {
mapVersion - subscriberMapVersion
} else {
null
}
}
}
@VisibleForTesting @VisibleForTesting
fun notifySubscribers(wireReg: WireNodeRegistration) { fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) {
// TODO: Once we have a better established messaging system, we can probably send // TODO: Once we have a better established messaging system, we can probably send
// to a MessageRecipientGroup that nodes join/leave, rather than the network map // to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group // service itself managing the group
val update = NetworkMapService.Update(wireReg, net.myAddress).serialize().bits val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bits
val message = net.createMessage(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, update) val message = net.createMessage(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked { subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>() val toRemove = mutableListOf<SingleMessageRecipient>()
val hash = SecureHash.sha256(wireReg.raw.bits) forEach { subscriber: Map.Entry<SingleMessageRecipient, LastAcknowledgeInfo> ->
forEach { subscriber: Map.Entry<SingleMessageRecipient, MutableList<SecureHash>> -> val unacknowledgedCount = mapVersion - subscriber.value.mapVersion
val unacknowledged = subscriber.value // TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst.
if (unacknowledged.count() < maxUnacknowledgedUpdates) { if (unacknowledgedCount <= maxUnacknowledgedUpdates) {
unacknowledged.add(hash)
net.send(message, subscriber.key) net.send(message, subscriber.key)
} else { } else {
toRemove.add(subscriber.key) toRemove.add(subscriber.key)
@ -164,8 +204,12 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
@VisibleForTesting @VisibleForTesting
fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit { fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit {
if (req.replyTo !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber()
subscribers.locked { subscribers.locked {
this[req.replyTo]?.remove(req.wireRegHash) val lastVersionAcked = this[req.replyTo]?.mapVersion
if ((lastVersionAcked ?: 0) < req.mapVersion) {
this[req.replyTo] = LastAcknowledgeInfo(req.mapVersion)
}
} }
} }
@ -174,9 +218,9 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
if (req.subscribe) { if (req.subscribe) {
addSubscriber(req.replyTo) addSubscriber(req.replyTo)
} }
val ver = mapVersion.get() val ver = mapVersion
if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) { if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) {
val nodes = ArrayList(registeredNodes.values) // Snapshot to avoid attempting to serialise ConcurrentHashMap internals val nodes = ArrayList(registeredNodes.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals
return NetworkMapService.FetchMapResponse(nodes, ver) return NetworkMapService.FetchMapResponse(nodes, ver)
} else { } else {
return NetworkMapService.FetchMapResponse(null, ver) return NetworkMapService.FetchMapResponse(null, ver)
@ -185,7 +229,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
@VisibleForTesting @VisibleForTesting
fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse { fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse {
val candidate = registeredNodes[req.identity] val candidate = registeredNodes[req.identity]?.reg
// If the most recent record we have is of the node being removed from the map, then it's considered // If the most recent record we have is of the node being removed from the map, then it's considered
// as no match. // as no match.
@ -212,12 +256,12 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
// Update the current value atomically, so that if multiple updates come // Update the current value atomically, so that if multiple updates come
// in on different threads, there is no risk of a race condition while checking // in on different threads, there is no risk of a race condition while checking
// sequence numbers. // sequence numbers.
registeredNodes.compute(node.identity, { mapKey: Party, existing: NodeRegistration? -> val registrationInfo = registeredNodes.compute(node.identity, { mapKey: Party, existing: NodeRegistrationInfo? ->
changed = existing == null || existing.serial < change.serial changed = existing == null || existing.reg.serial < change.serial
if (changed) { if (changed) {
when (change.type) { when (change.type) {
AddOrRemove.ADD -> change AddOrRemove.ADD -> NodeRegistrationInfo(change, mapVersionIncrementAndGet())
AddOrRemove.REMOVE -> change AddOrRemove.REMOVE -> NodeRegistrationInfo(change, mapVersionIncrementAndGet())
else -> throw NodeMapError.UnknownChangeType() else -> throw NodeMapError.UnknownChangeType()
} }
} else { } else {
@ -225,7 +269,7 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
} }
}) })
if (changed) { if (changed) {
notifySubscribers(req.wireReg) notifySubscribers(req.wireReg, registrationInfo!!.mapVersion)
// Update the local cache // Update the local cache
// TODO: Once local messaging is fixed, this should go over the network layer as it does to other // TODO: Once local messaging is fixed, this should go over the network layer as it does to other
@ -241,7 +285,6 @@ class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, v
} }
} }
mapVersion.incrementAndGet()
} }
return NetworkMapService.RegistrationResponse(changed) return NetworkMapService.RegistrationResponse(changed)
} }
@ -304,3 +347,6 @@ sealed class NodeMapError : Exception() {
/** Thrown if a change arrives which is of an unknown type */ /** Thrown if a change arrives which is of an unknown type */
class UnknownChangeType : NodeMapError() class UnknownChangeType : NodeMapError()
} }
data class LastAcknowledgeInfo(val mapVersion: Int)
data class NodeRegistrationInfo(val reg: NodeRegistration, val mapVersion: Int)

View File

@ -1,15 +1,14 @@
package com.r3corda.node.services package com.r3corda.node.services
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue import com.r3corda.core.random63BitValue
import com.r3corda.testing.node.MockNetwork
import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.testing.node.MockNetwork
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.security.PrivateKey import java.security.PrivateKey
@ -63,11 +62,11 @@ class InMemoryNetworkMapServiceTest {
assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
} }
class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash) : ProtocolLogic<Unit>() { class TestAcknowledgePSM(val server: NodeInfo, val mapVersion: Int) : ProtocolLogic<Unit>() {
override val topic: String get() = NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC override val topic: String get() = NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC
@Suspendable @Suspendable
override fun call() { override fun call() {
val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress) val req = NetworkMapService.UpdateAcknowledge(mapVersion, serviceHub.networkService.myAddress)
send(server.identity, 0, req) send(server.identity, 0, req)
} }
} }
@ -145,39 +144,40 @@ class InMemoryNetworkMapServiceTest {
network.runNetwork() network.runNetwork()
subscribePsm.get() subscribePsm.get()
val startingMapVersion = service.mapVersion
// Check the unacknowledged count is zero // Check the unacknowledged count is zero
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address)) assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address, startingMapVersion))
// Fire off an update // Fire off an update
val nodeKey = registerNode.storage.myLegalIdentityKey val nodeKey = registerNode.storage.myLegalIdentityKey
var seq = 1L var seq = 0L
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) var reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
var wireReg = reg.toWire(nodeKey.private) var wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg) service.notifySubscribers(wireReg, startingMapVersion + 1)
// Check the unacknowledged count is one // Check the unacknowledged count is one
assertEquals(1, service.getUnacknowledgedCount(registerNode.info.address)) assertEquals(1, service.getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
// Send in an acknowledgment and verify the count goes down // Send in an acknowledgment and verify the count goes down
val hash = SecureHash.sha256(wireReg.raw.bits)
val acknowledgePsm = registerNode.services.startProtocol(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, val acknowledgePsm = registerNode.services.startProtocol(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
TestAcknowledgePSM(mapServiceNode.info, hash)) TestAcknowledgePSM(mapServiceNode.info, startingMapVersion + 1))
network.runNetwork() network.runNetwork()
acknowledgePsm.get() acknowledgePsm.get()
assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address)) assertEquals(0, service.getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
// Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit // Intentionally fill the pending acknowledgements to verify it doesn't drop subscribers before the limit
// is hit. On the last iteration overflow the pending list, and check the node is unsubscribed // is hit. On the last iteration overflow the pending list, and check the node is unsubscribed
for (i in 0..service.maxUnacknowledgedUpdates) { for (i in 0..service.maxUnacknowledgedUpdates) {
reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires) reg = NodeRegistration(registerNode.info, seq++, AddOrRemove.ADD, expires)
wireReg = reg.toWire(nodeKey.private) wireReg = reg.toWire(nodeKey.private)
service.notifySubscribers(wireReg) service.notifySubscribers(wireReg, i + startingMapVersion + 2)
if (i < service.maxUnacknowledgedUpdates) { if (i < service.maxUnacknowledgedUpdates) {
assertEquals(i + 1, service.getUnacknowledgedCount(registerNode.info.address)) assertEquals(i + 1, service.getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2))
} else { } else {
assertNull(service.getUnacknowledgedCount(registerNode.info.address)) assertNull(service.getUnacknowledgedCount(registerNode.info.address, i + startingMapVersion + 2))
} }
} }
} }