Moved topic parameter from send/receive methods to a ProtocolLogic property

This commit is contained in:
Shams Asari
2016-07-05 10:22:11 +01:00
parent 4356cef1cd
commit 5c0e7fbbf2
28 changed files with 223 additions and 195 deletions

View File

@ -61,8 +61,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
showConsensusFor(listOf(buyer, seller, notary))
showProgressFor(listOf(buyer, seller))
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", sellerProtocol)
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
return Futures.successfulAsList(buyerFuture, sellerFuture)
}

View File

@ -14,7 +14,7 @@ import protocols.NotaryChangeProtocol
*/
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
init {
addMessageHandler(NotaryChangeProtocol.TOPIC_INITIATE,
addMessageHandler(NotaryChangeProtocol.TOPIC,
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
)
}
@ -24,7 +24,7 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, n
req.replyToParty,
req.sessionID,
req.sessionIdForSend)
smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
smm.add(NotaryChangeProtocol.TOPIC, protocol)
return Ack
}
}

View File

@ -17,6 +17,7 @@ import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.AcceptsFileUpload
import com.r3corda.node.utilities.FiberBox
import com.r3corda.protocols.RatesFixProtocol
import com.r3corda.protocols.ServiceRequestMessage
import org.slf4j.LoggerFactory
import java.io.InputStream
import java.math.BigDecimal
@ -48,32 +49,36 @@ object NodeInterestRates {
private val logger = LoggerFactory.getLogger(Service::class.java)
init {
addMessageHandler(RatesFixProtocol.TOPIC_SIGN,
{ req: RatesFixProtocol.SignRequest -> oracle.sign(req.tx) },
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
addMessageHandler(RatesFixProtocol.TOPIC_QUERY,
{ req: RatesFixProtocol.QueryRequest ->
/**
* We put this into a protocol so that if it blocks waiting for the interest rate to become
* available, we a) don't block this thread and b) allow the fact we are waiting
* to be persisted/checkpointed.
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way.
*/
node.smm.add("fixing", FixQueryHandler(this, req))
return@addMessageHandler
addMessageHandler(RatesFixProtocol.TOPIC,
{ req: ServiceRequestMessage ->
if (req is RatesFixProtocol.SignRequest) {
oracle.sign(req.tx)
}
else {
/**
* We put this into a protocol so that if it blocks waiting for the interest rate to become
* available, we a) don't block this thread and b) allow the fact we are waiting
* to be persisted/checkpointed.
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way.
*/
node.smm.add("fixing", FixQueryHandler(this, req as RatesFixProtocol.QueryRequest))
Unit
}
},
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
}
private class FixQueryHandler(val service: Service, val request: RatesFixProtocol.QueryRequest) : ProtocolLogic<Unit>() {
private class FixQueryHandler(val service: Service,
val request: RatesFixProtocol.QueryRequest) : ProtocolLogic<Unit>() {
companion object {
object RECEIVED : ProgressTracker.Step("Received fix request")
object SENDING : ProgressTracker.Step("Sending fix response")
}
override val topic: String get() = RatesFixProtocol.TOPIC
override val progressTracker = ProgressTracker(RECEIVED, SENDING)
init {
@ -84,9 +89,8 @@ object NodeInterestRates {
override fun call(): Unit {
val answers = service.oracle.query(request.queries, request.deadline)
progressTracker.currentStep = SENDING
send("${RatesFixProtocol.TOPIC}.query", request.replyToParty, request.sessionID, answers)
send(request.replyToParty, request.sessionID, answers)
}
}
// File upload support

View File

@ -253,8 +253,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
request.payload?.let {
val topic = "${request.topic}.${request.sessionIDForSend}"
psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" }
val address = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name)!!.address
serviceHub.networkService.send(topic, it, address)
val node = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name)
requireNotNull(node) { "Don't know about ${request.destination}" }
serviceHub.networkService.send(topic, it, node!!.address)
}
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.

View File

@ -32,7 +32,7 @@ abstract class NotaryService(val smm: StateMachineManager,
abstract val protocolFactory: NotaryProtocol.Factory
init {
addMessageHandler(NotaryProtocol.TOPIC_INITIATE,
addMessageHandler(NotaryProtocol.TOPIC,
{ req: NotaryProtocol.Handshake -> processRequest(req) }
)
}

View File

@ -58,14 +58,14 @@ class TwoPartyTradeProtocolTests {
otherSide: Party, assetToSell: StateAndRef<OwnableState>, price: Amount<Issued<Currency>>,
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", seller)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller)
}
private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
otherSide: Party, acceptablePrice: Amount<Issued<Currency>>, typeToBuy: Class<out OwnableState>,
sessionID: Long): ListenableFuture<SignedTransaction> {
val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyer)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer)
}
@Before

View File

@ -68,50 +68,45 @@ class InMemoryNetworkMapServiceTest {
assert(!service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
}
class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash)
: ProtocolLogic<Unit>() {
class TestAcknowledgePSM(val server: NodeInfo, val hash: SecureHash) : ProtocolLogic<Unit>() {
override val topic: String get() = NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC
@Suspendable
override fun call() {
val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress)
send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.identity, 0, req)
send(server.identity, 0, req)
}
}
class TestFetchPSM(val server: NodeInfo, val subscribe: Boolean, val ifChangedSinceVersion: Int? = null)
: ProtocolLogic<Collection<NodeRegistration>?>() {
override val topic: String get() = NetworkMapService.FETCH_PROTOCOL_TOPIC
@Suspendable
override fun call(): Collection<NodeRegistration>? {
val sessionID = random63BitValue()
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.FetchMapResponse>(
NetworkMapService.FETCH_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
.validate { it.nodes }
return sendAndReceive<NetworkMapService.FetchMapResponse>(server.identity, 0, sessionID, req).validate { it.nodes }
}
}
class TestRegisterPSM(val server: NodeInfo, val reg: NodeRegistration, val privateKey: PrivateKey)
: ProtocolLogic<NetworkMapService.RegistrationResponse>() {
override val topic: String get() = NetworkMapService.REGISTER_PROTOCOL_TOPIC
@Suspendable
override fun call(): NetworkMapService.RegistrationResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.RegistrationResponse>(
NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
.validate { it }
return sendAndReceive<NetworkMapService.RegistrationResponse>(server.identity, 0, sessionID, req).validate { it }
}
}
class TestSubscribePSM(val server: NodeInfo, val subscribe: Boolean)
: ProtocolLogic<NetworkMapService.SubscribeResponse>() {
override val topic: String get() = NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC
@Suspendable
override fun call(): NetworkMapService.SubscribeResponse {
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID)
return sendAndReceive<NetworkMapService.SubscribeResponse>(
NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
.validate { it }
return sendAndReceive<NetworkMapService.SubscribeResponse>(server.identity, 0, sessionID, req).validate { it }
}
}

View File

@ -97,6 +97,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
(serviceHub as TestReference).testReference.calls += increment
(serviceHub as TestReference).testReference.countDown.countDown()
}
override val topic: String get() = throw UnsupportedOperationException()
}
class Command : TypeOnlyCommandData()

View File

@ -59,6 +59,8 @@ class StateMachineManagerTests {
protocolStarted = true
Fiber.park()
}
override val topic: String get() = throw UnsupportedOperationException()
}
@ -68,6 +70,8 @@ class StateMachineManagerTests {
@Suspendable
override fun call() {}
override val topic: String get() = throw UnsupportedOperationException()
}

View File

@ -11,10 +11,10 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import org.junit.Before
import org.junit.Test
import protocols.StateReplacementException
import protocols.StateReplacementRefused
import protocols.NotaryChangeProtocol
import protocols.NotaryChangeProtocol.Instigator
import protocols.StateReplacementException
import protocols.StateReplacementRefused
import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -46,7 +46,7 @@ class NotaryChangeTests {
val state = issueState(clientNodeA)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()
@ -59,7 +59,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()
@ -75,7 +75,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
val protocol = Instigator(state, newEvilNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()