Removed unused ServiceRequestMessage class hierarchy and added sendRequest method

This commit is contained in:
Shams Asari 2016-10-04 18:20:51 +01:00
parent 307c93858b
commit c3f824001d
7 changed files with 85 additions and 109 deletions

View File

@ -1,35 +1,30 @@
package com.r3corda.protocols package com.r3corda.protocols
import com.r3corda.core.crypto.Party import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.messaging.onNext
import com.r3corda.core.messaging.send
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import java.util.concurrent.Executor
/** /**
* Abstract superclass for request messages sent to services, which includes common * Abstract superclass for request messages sent to services which expect a reply.
* fields such as replyTo and sessionID.
*/ */
interface ServiceRequestMessage { interface ServiceRequestMessage {
val sessionID: Long val sessionID: Long
fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients val replyTo: SingleMessageRecipient
} }
/** /**
* A message which specifies reply destination as a specific endpoint such as a monitoring client. This is of particular * Sends a [ServiceRequestMessage] to [target] and returns a [ListenableFuture] of the response.
* use where we want to address a specific endpoint, not necessarily a specific user (for example if the same user logs * @param R The type of the response.
* in on two machines, we want to consistently deliver messages as part of a session, to the same machine the session
* started on).
*/ */
interface DirectRequestMessage: ServiceRequestMessage { fun <R : Any> MessagingService.sendRequest(topic: String,
val replyToRecipient: SingleMessageRecipient request: ServiceRequestMessage,
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyToRecipient target: SingleMessageRecipient,
} executor: Executor? = null): ListenableFuture<R> {
val responseFuture = onNext<R>(topic, request.sessionID, executor)
interface PartyRequestMessage : ServiceRequestMessage { send(topic, DEFAULT_SESSION_ID, request, target)
return responseFuture
val replyToParty: Party
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients {
return networkMapCache.partyNodes.single { it.identity == replyToParty }.address
}
} }

View File

@ -8,8 +8,6 @@ import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.X509Utilities import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.messaging.onNext
import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.CityDatabase
import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
@ -18,7 +16,6 @@ import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds import com.r3corda.core.seconds
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
@ -49,6 +46,7 @@ import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.node.services.vault.CashBalanceAsMetricsObserver import com.r3corda.node.services.vault.CashBalanceAsMetricsObserver
import com.r3corda.node.services.vault.NodeVaultService import com.r3corda.node.services.vault.NodeVaultService
import com.r3corda.node.utilities.* import com.r3corda.node.utilities.*
import com.r3corda.protocols.sendRequest
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger import org.slf4j.Logger
import java.nio.file.FileAlreadyExistsException import java.nio.file.FileAlreadyExistsException
@ -366,12 +364,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
val instant = platformClock.instant() val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires) val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires)
val sessionID = random63BitValue() val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress)
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) return net.sendRequest(REGISTER_PROTOCOL_TOPIC, request, networkMapAddr, RunOnCallerThread)
val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits)
val future = net.onNext<RegistrationResponse>(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread)
net.send(message, networkMapAddr)
return future
} }
protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(setOf(storage.myLegalIdentityKey)) protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(setOf(storage.myLegalIdentityKey))

View File

@ -38,7 +38,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
// If the return type R is Unit, then do not send a response // If the return type R is Unit, then do not send a response
if (response.javaClass != Unit.javaClass) { if (response.javaClass != Unit.javaClass) {
val msg = net.createMessage(topic, request.sessionID, response.serialize().bits) val msg = net.createMessage(topic, request.sessionID, response.serialize().bits)
net.send(msg, request.getReplyTo(services.networkMapCache)) net.send(msg, request.replyTo)
} }
} catch(e: Exception) { } catch(e: Exception) {
exceptionConsumer(message, e) exceptionConsumer(message, e)

View File

@ -7,7 +7,9 @@ import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.Contract import com.r3corda.core.contracts.Contract
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.map import com.r3corda.core.map
import com.r3corda.core.messaging.* import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkCacheError import com.r3corda.core.node.services.NetworkCacheError
@ -15,7 +17,6 @@ import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.NetworkMapCache.MapChange import com.r3corda.core.node.services.NetworkMapCache.MapChange
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
@ -26,6 +27,7 @@ import com.r3corda.node.services.network.NetworkMapService.FetchMapResponse
import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse
import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.sendRequest
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.security.PublicKey import java.security.PublicKey
@ -82,17 +84,12 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
} }
// Fetch the network map and register for updates at the same time // Fetch the network map and register for updates at the same time
val sessionID = random63BitValue() val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress)
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID) val future = net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, networkMapAddress, RunOnCallerThread).map { resp ->
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = net.onNext<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map { resp ->
// We may not receive any nodes back, if the map hasn't changed since the version specified // We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) } resp.nodes?.forEach { processRegistration(it) }
Unit Unit
} }
net.send(FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress)
_registrationFuture.setFuture(future) _registrationFuture.setFuture(future)
return future return future
@ -119,19 +116,11 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
*/ */
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> { override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time // Fetch the network map and register for updates at the same time
val sessionID = random63BitValue() val req = NetworkMapService.SubscribeRequest(false, net.myAddress)
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID) val future = net.sendRequest<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, req, service.address, RunOnCallerThread).map {
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = net.onNext<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
} }
_registrationFuture.setFuture(future) _registrationFuture.setFuture(future)
net.send(SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
return future return future
} }

View File

@ -14,6 +14,7 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
/** /**
* A network map contains lists of nodes on the network along with information about their identity keys, services * A network map contains lists of nodes on the network along with information about their identity keys, services
* they provide and host names or IP addresses where they can be connected to. This information is cached locally within * they provide and host names or IP addresses where they can be connected to. This information is cached locally within
@ -67,18 +67,27 @@ interface NetworkMapService {
val nodes: List<NodeInfo> val nodes: List<NodeInfo>
abstract class NetworkMapRequestMessage(val replyTo: MessageRecipients) : ServiceRequestMessage { class FetchMapRequest(val subscribe: Boolean,
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyTo val ifChangedSinceVersion: Int?,
} override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int) data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
class QueryIdentityRequest(val identity: Party,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long) : ServiceRequestMessage
data class QueryIdentityResponse(val node: NodeInfo?) data class QueryIdentityResponse(val node: NodeInfo?)
class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
class RegistrationRequest(val wireReg: WireNodeRegistration,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
data class RegistrationResponse(val success: Boolean) data class RegistrationResponse(val success: Boolean)
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
class SubscribeRequest(val subscribe: Boolean,
override val replyTo: SingleMessageRecipient,
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
data class SubscribeResponse(val confirmed: Boolean) data class SubscribeResponse(val confirmed: Boolean)
data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients) data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients) data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
} }

View File

@ -2,13 +2,21 @@ package com.r3corda.node.services
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.map import com.r3corda.core.map
import com.r3corda.core.random63BitValue import com.r3corda.core.messaging.send
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.node.services.network.AbstractNetworkMapService import com.r3corda.node.services.network.AbstractNetworkMapService
import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NetworkMapService.*
import com.r3corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.sendRequest
import com.r3corda.testing.node.MockNetwork import com.r3corda.testing.node.MockNetwork
import com.r3corda.testing.node.MockNetwork.MockNode
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.security.PrivateKey import java.security.PrivateKey
@ -24,8 +32,8 @@ import kotlin.test.assertTrue
*/ */
abstract class AbstractNetworkMapServiceTest { abstract class AbstractNetworkMapServiceTest {
protected fun success(mapServiceNode: MockNetwork.MockNode, protected fun success(mapServiceNode: MockNode,
registerNode: MockNetwork.MockNode, registerNode: MockNode,
service: () -> AbstractNetworkMapService, service: () -> AbstractNetworkMapService,
swizzle: () -> Unit) { swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
@ -41,14 +49,14 @@ abstract class AbstractNetworkMapServiceTest {
val nodeKey = registerNode.storage.myLegalIdentityKey val nodeKey = registerNode.storage.myLegalIdentityKey
val addChange = NodeRegistration(registerNode.info, instant.toEpochMilli(), AddOrRemove.ADD, expires) val addChange = NodeRegistration(registerNode.info, instant.toEpochMilli(), AddOrRemove.ADD, expires)
val addWireChange = addChange.toWire(nodeKey.private) val addWireChange = addChange.toWire(nodeKey.private)
service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
swizzle() swizzle()
assertEquals(1, service().nodes.count()) assertEquals(1, service().nodes.count())
assertEquals(registerNode.info, service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) assertEquals(registerNode.info, service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
// Re-registering should be a no-op // Re-registering should be a no-op
service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE)) service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
swizzle() swizzle()
assertEquals(1, service().nodes.count()) assertEquals(1, service().nodes.count())
@ -56,26 +64,26 @@ abstract class AbstractNetworkMapServiceTest {
// Confirm that de-registering the node succeeds and drops it from the node lists // Confirm that de-registering the node succeeds and drops it from the node lists
val removeChange = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires) val removeChange = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires)
val removeWireChange = removeChange.toWire(nodeKey.private) val removeWireChange = removeChange.toWire(nodeKey.private)
assert(service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) assert(service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
swizzle() swizzle()
assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
swizzle() swizzle()
// Trying to de-register a node that doesn't exist should fail // Trying to de-register a node that doesn't exist should fail
assert(!service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) assert(!service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
} }
protected fun `success with network`(network: MockNetwork, protected fun `success with network`(network: MockNetwork,
mapServiceNode: MockNetwork.MockNode, mapServiceNode: MockNode,
registerNode: MockNetwork.MockNode, registerNode: MockNode,
swizzle: () -> Unit) { swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
swizzle() swizzle()
// Confirm all nodes have registered themselves // Confirm all nodes have registered themselves
network.runNetwork() network.runNetwork()
var fetchPsm = fetchMap(registerNode, mapServiceNode, false) var fetchPsm = registerNode.fetchMap(mapServiceNode, false)
network.runNetwork() network.runNetwork()
assertEquals(2, fetchPsm.get()?.count()) assertEquals(2, fetchPsm.get()?.count())
@ -84,21 +92,21 @@ abstract class AbstractNetworkMapServiceTest {
val instant = Instant.now() val instant = Instant.now()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires) val reg = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires)
val registerPsm = registration(registerNode, mapServiceNode, reg, nodeKey.private) val registerPsm = registerNode.registration(mapServiceNode, reg, nodeKey.private)
network.runNetwork() network.runNetwork()
assertTrue(registerPsm.get().success) assertTrue(registerPsm.get().success)
swizzle() swizzle()
// Now only map service node should be registered // Now only map service node should be registered
fetchPsm = fetchMap(registerNode, mapServiceNode, false) fetchPsm = registerNode.fetchMap(mapServiceNode, false)
network.runNetwork() network.runNetwork()
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single()) assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
} }
protected fun `subscribe with network`(network: MockNetwork, protected fun `subscribe with network`(network: MockNetwork,
mapServiceNode: MockNetwork.MockNode, mapServiceNode: MockNode,
registerNode: MockNetwork.MockNode, registerNode: MockNode,
service: () -> AbstractNetworkMapService, service: () -> AbstractNetworkMapService,
swizzle: () -> Unit) { swizzle: () -> Unit) {
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved. // For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
@ -106,7 +114,7 @@ abstract class AbstractNetworkMapServiceTest {
// Test subscribing to updates // Test subscribing to updates
network.runNetwork() network.runNetwork()
val subscribePsm = subscribe(registerNode, mapServiceNode, true) val subscribePsm = registerNode.subscribe(mapServiceNode, true)
network.runNetwork() network.runNetwork()
subscribePsm.get() subscribePsm.get()
@ -131,7 +139,7 @@ abstract class AbstractNetworkMapServiceTest {
assertEquals(1, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1)) assertEquals(1, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
// Send in an acknowledgment and verify the count goes down // Send in an acknowledgment and verify the count goes down
updateAcknowlege(registerNode, mapServiceNode, startingMapVersion + 1) registerNode.updateAcknowlege(mapServiceNode, startingMapVersion + 1)
network.runNetwork() network.runNetwork()
swizzle() swizzle()
@ -155,24 +163,25 @@ abstract class AbstractNetworkMapServiceTest {
} }
} }
private fun registration(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<NetworkMapService.RegistrationResponse> { private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<RegistrationResponse> {
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), registerNode.services.networkService.myAddress, random63BitValue()) val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress)
return registerNode.sendAndReceive<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_PROTOCOL_TOPIC, mapServiceNode, req) return services.networkService.sendRequest(REGISTER_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
} }
private fun subscribe(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, subscribe: Boolean): ListenableFuture<NetworkMapService.SubscribeResponse> { private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture<SubscribeResponse> {
val req = NetworkMapService.SubscribeRequest(subscribe, registerNode.services.networkService.myAddress, random63BitValue()) val req = SubscribeRequest(subscribe, services.networkService.myAddress)
return registerNode.sendAndReceive<NetworkMapService.SubscribeResponse>(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, mapServiceNode, req) return services.networkService.sendRequest(SUBSCRIPTION_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
} }
private fun updateAcknowlege(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, mapVersion: Int) { private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) {
val req = NetworkMapService.UpdateAcknowledge(mapVersion, registerNode.services.networkService.myAddress) val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
registerNode.send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, mapServiceNode, req) services.networkService.send(PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
} }
private fun fetchMap(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> { private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, registerNode.services.networkService.myAddress, random63BitValue()) val net = services.networkService
return registerNode.sendAndReceive<NetworkMapService.FetchMapResponse>(NetworkMapService.FETCH_PROTOCOL_TOPIC, mapServiceNode, req).map { it.nodes } val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress)
return net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
} }
} }

View File

@ -2,13 +2,9 @@ package com.r3corda.testing.node
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.div import com.r3corda.core.div
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.onNext
import com.r3corda.core.messaging.send
import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.PhysicalLocation
import com.r3corda.core.node.services.KeyManagementService import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.node.services.ServiceInfo import com.r3corda.core.node.services.ServiceInfo
@ -23,13 +19,12 @@ import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.node.services.messaging.CordaRPCOps import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DBCheckpointStorage import com.r3corda.node.services.persistence.DBCheckpointStorage
import com.r3corda.node.services.persistence.PerFileCheckpointStorage import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.databaseTransaction import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.protocols.ServiceRequestMessage
import org.slf4j.Logger import org.slf4j.Logger
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
@ -146,21 +141,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block) return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
} }
fun send(topic: String, target: MockNode, payload: Any) {
services.networkService.send(TopicSession(topic), payload, target.info.address)
}
fun <M : Any> receive(topic: String, sessionId: Long): ListenableFuture<M> {
return services.networkService.onNext<M>(topic, sessionId)
}
inline fun <reified T : Any> sendAndReceive(topic: String,
target: MockNode,
payload: ServiceRequestMessage): ListenableFuture<T> {
send(topic, target, payload)
return receive(topic, payload.sessionID)
}
fun disableDBCloseOnStop() { fun disableDBCloseOnStop() {
runOnStop.remove(dbCloser) runOnStop.remove(dbCloser)
} }