mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Add support for re-sending session messages. This is useful when talking to a distributed service, e.g. notary – if one of the nodes go down in the middle of a session, the session will be re-established with a different node (round-robin order).
This commit is contained in:
parent
606bba590b
commit
9a0653128c
@ -78,6 +78,23 @@ abstract class FlowLogic<out T> {
|
|||||||
return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow)
|
return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @see sendAndReceiveWithRetry */
|
||||||
|
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any) = sendAndReceiveWithRetry(R::class.java, otherParty, payload)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
|
||||||
|
*
|
||||||
|
* Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead.
|
||||||
|
* It is only intended for the case where the [otherParty] is running a distributed service with an idempotent
|
||||||
|
* flow which only accepts a single request and sends back a single response – e.g. a notary or certain types of
|
||||||
|
* oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
|
||||||
|
* to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
internal open fun <R : Any> sendAndReceiveWithRetry(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
|
||||||
|
return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow, true)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Suspends until the specified [otherParty] sends us a message of type [R].
|
* Suspends until the specified [otherParty] sends us a message of type [R].
|
||||||
*
|
*
|
||||||
|
@ -49,7 +49,8 @@ interface FlowStateMachine<R> {
|
|||||||
fun <T : Any> sendAndReceive(receiveType: Class<T>,
|
fun <T : Any> sendAndReceive(receiveType: Class<T>,
|
||||||
otherParty: Party,
|
otherParty: Party,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
sessionFlow: FlowLogic<*>): UntrustworthyData<T>
|
sessionFlow: FlowLogic<*>,
|
||||||
|
retrySend: Boolean = false): UntrustworthyData<T>
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T>
|
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T>
|
||||||
|
@ -61,7 +61,7 @@ object NotaryFlow {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val response = try {
|
val response = try {
|
||||||
sendAndReceive<List<DigitalSignature.WithKey>>(notaryParty, payload)
|
sendAndReceiveWithRetry<List<DigitalSignature.WithKey>>(notaryParty, payload)
|
||||||
} catch (e: NotaryException) {
|
} catch (e: NotaryException) {
|
||||||
if (e.error is NotaryError.Conflict) {
|
if (e.error is NotaryError.Conflict) {
|
||||||
e.error.conflict.verified()
|
e.error.conflict.verified()
|
||||||
|
@ -12,9 +12,7 @@ import net.corda.core.serialization.deserialize
|
|||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.internal.Node
|
import net.corda.node.internal.Node
|
||||||
import net.corda.node.services.messaging.ServiceRequestMessage
|
import net.corda.node.services.messaging.*
|
||||||
import net.corda.node.services.messaging.createMessage
|
|
||||||
import net.corda.node.services.messaging.sendRequest
|
|
||||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||||
import net.corda.node.services.transactions.SimpleNotaryService
|
import net.corda.node.services.transactions.SimpleNotaryService
|
||||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||||
@ -23,6 +21,10 @@ import net.corda.testing.node.NodeBasedTest
|
|||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
class P2PMessagingTest : NodeBasedTest() {
|
class P2PMessagingTest : NodeBasedTest() {
|
||||||
@Test
|
@Test
|
||||||
@ -87,6 +89,100 @@ class P2PMessagingTest : NodeBasedTest() {
|
|||||||
assertAllNodesAreUsed(distributedService, serviceName, distributedService[0])
|
assertAllNodesAreUsed(distributedService, serviceName, distributedService[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() {
|
||||||
|
val distributedServiceNodes = startNotaryCluster("DistributedService", 2).getOrThrow()
|
||||||
|
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||||
|
val serviceAddress = alice.services.networkMapCache.run {
|
||||||
|
alice.net.getAddressOfParty(getPartyInfo(getAnyNotary()!!)!!)
|
||||||
|
}
|
||||||
|
|
||||||
|
val dummyTopic = "dummy.topic"
|
||||||
|
val responseMessage = "response"
|
||||||
|
|
||||||
|
simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage)
|
||||||
|
|
||||||
|
// Send a single request with retry
|
||||||
|
val response = with(alice.net) {
|
||||||
|
val request = TestRequest(replyTo = myAddress)
|
||||||
|
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
|
||||||
|
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||||
|
send(msg, serviceAddress, retryId = request.sessionID)
|
||||||
|
responseFuture
|
||||||
|
}.getOrThrow(10.seconds)
|
||||||
|
|
||||||
|
assertThat(response).isEqualTo(responseMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `distributed service request retries are persisted across client node restarts`() {
|
||||||
|
val distributedServiceNodes = startNotaryCluster("DistributedService", 2).getOrThrow()
|
||||||
|
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||||
|
val serviceAddress = alice.services.networkMapCache.run {
|
||||||
|
alice.net.getAddressOfParty(getPartyInfo(getAnyNotary()!!)!!)
|
||||||
|
}
|
||||||
|
|
||||||
|
val dummyTopic = "dummy.topic"
|
||||||
|
val responseMessage = "response"
|
||||||
|
|
||||||
|
val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage)
|
||||||
|
|
||||||
|
val sessionId = random63BitValue()
|
||||||
|
|
||||||
|
// Send a single request with retry
|
||||||
|
with(alice.net) {
|
||||||
|
val request = TestRequest(sessionId, myAddress)
|
||||||
|
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||||
|
send(msg, serviceAddress, retryId = request.sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until the first request is received
|
||||||
|
firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||||
|
// Stop alice's node before the request is redelivered – the first request is ignored
|
||||||
|
alice.stop()
|
||||||
|
assertThat(requestsReceived.get()).isEqualTo(1)
|
||||||
|
|
||||||
|
// Restart the node and expect a response
|
||||||
|
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||||
|
val response = aliceRestarted.net.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
|
||||||
|
|
||||||
|
assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2)
|
||||||
|
assertThat(response).isEqualTo(responseMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to
|
||||||
|
* receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives
|
||||||
|
* a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests.
|
||||||
|
*/
|
||||||
|
private fun simulateCrashingNode(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): Pair<CountDownLatch, AtomicInteger> {
|
||||||
|
val firstToReceive = AtomicBoolean(true)
|
||||||
|
val requestsReceived = AtomicInteger(0)
|
||||||
|
val firstRequestReceived = CountDownLatch(1)
|
||||||
|
distributedServiceNodes.forEach {
|
||||||
|
val nodeName = it.info.legalIdentity.name
|
||||||
|
var ignoreRequests = false
|
||||||
|
it.net.addMessageHandler(dummyTopic, DEFAULT_SESSION_ID) { netMessage, _ ->
|
||||||
|
requestsReceived.incrementAndGet()
|
||||||
|
firstRequestReceived.countDown()
|
||||||
|
// The node which receives the first request will ignore all requests
|
||||||
|
if (firstToReceive.getAndSet(false)) ignoreRequests = true
|
||||||
|
print("$nodeName: Received request - ")
|
||||||
|
if (ignoreRequests) {
|
||||||
|
println("ignoring")
|
||||||
|
// Requests are ignored to simulate a service node crashing before sending back a response.
|
||||||
|
// A retry by the client will result in the message being redelivered to another node in the service cluster.
|
||||||
|
} else {
|
||||||
|
println("sending response")
|
||||||
|
val request = netMessage.data.deserialize<TestRequest>()
|
||||||
|
val response = it.net.createMessage(dummyTopic, request.sessionID, responseMessage.serialize().bytes)
|
||||||
|
it.net.send(response, request.replyTo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Pair(firstRequestReceived, requestsReceived)
|
||||||
|
}
|
||||||
|
|
||||||
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: String, originatingNode: Node) {
|
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: String, originatingNode: Node) {
|
||||||
// Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used
|
// Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used
|
||||||
participatingServiceNodes.forEach { node ->
|
participatingServiceNodes.forEach { node ->
|
||||||
|
@ -32,6 +32,7 @@ interface NodeConfiguration : SSLConfiguration {
|
|||||||
val certificateSigningService: URL
|
val certificateSigningService: URL
|
||||||
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
|
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
|
||||||
val verifierType: VerifierType
|
val verifierType: VerifierType
|
||||||
|
val messageRedeliveryDelaySeconds: Int
|
||||||
}
|
}
|
||||||
|
|
||||||
data class FullNodeConfiguration(
|
data class FullNodeConfiguration(
|
||||||
@ -51,6 +52,7 @@ data class FullNodeConfiguration(
|
|||||||
override val minimumPlatformVersion: Int = 1,
|
override val minimumPlatformVersion: Int = 1,
|
||||||
override val rpcUsers: List<User>,
|
override val rpcUsers: List<User>,
|
||||||
override val verifierType: VerifierType,
|
override val verifierType: VerifierType,
|
||||||
|
override val messageRedeliveryDelaySeconds: Int = 30,
|
||||||
val useHTTPS: Boolean,
|
val useHTTPS: Boolean,
|
||||||
@OldConfig("artemisAddress")
|
@OldConfig("artemisAddress")
|
||||||
val p2pAddress: HostAndPort,
|
val p2pAddress: HostAndPort,
|
||||||
|
@ -74,8 +74,14 @@ interface MessagingService {
|
|||||||
*
|
*
|
||||||
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
|
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
|
||||||
* to send an ACK message back.
|
* to send an ACK message back.
|
||||||
|
*
|
||||||
|
* @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id.
|
||||||
|
* Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.
|
||||||
*/
|
*/
|
||||||
fun send(message: Message, target: MessageRecipients)
|
fun send(message: Message, target: MessageRecipients, retryId: Long? = null)
|
||||||
|
|
||||||
|
/** Cancels the scheduled message redelivery for the specified [retryId] */
|
||||||
|
fun cancelRedelivery(retryId: Long)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||||
@ -158,8 +164,8 @@ fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): Listenabl
|
|||||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||||
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
||||||
|
|
||||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null)
|
||||||
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to)
|
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||||
|
@ -43,9 +43,7 @@ import org.jetbrains.exposed.sql.statements.InsertStatement
|
|||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.*
|
||||||
import java.util.concurrent.CountDownLatch
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
|
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
|
||||||
@ -92,6 +90,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
private val platformVersionProperty = SimpleString("platform-version")
|
private val platformVersionProperty = SimpleString("platform-version")
|
||||||
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
|
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
|
||||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||||
|
|
||||||
|
private val messageMaxRetryCount: Int = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
private class InnerState {
|
private class InnerState {
|
||||||
@ -108,6 +108,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
var verificationResponseConsumer: ClientConsumer? = null
|
var verificationResponseConsumer: ClientConsumer? = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val messagesToRedeliver = database.transaction {
|
||||||
|
JDBCHashMap<Long, Pair<Message, MessageRecipients>>("${NODE_DATABASE_PREFIX}message_retry", true)
|
||||||
|
}
|
||||||
|
|
||||||
|
val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
|
||||||
|
|
||||||
val verifierService = when (config.verifierType) {
|
val verifierService = when (config.verifierType) {
|
||||||
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||||
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
|
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
|
||||||
@ -201,6 +207,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resumeMessageRedelivery()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -217,6 +225,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
session.createConsumer(P2P_QUEUE)
|
session.createConsumer(P2P_QUEUE)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun resumeMessageRedelivery() {
|
||||||
|
messagesToRedeliver.forEach {
|
||||||
|
retryId, (message, target) -> send(message, target, retryId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private var shutdownLatch = CountDownLatch(1)
|
private var shutdownLatch = CountDownLatch(1)
|
||||||
|
|
||||||
private fun processMessage(consumer: ClientConsumer): Boolean {
|
private fun processMessage(consumer: ClientConsumer): Boolean {
|
||||||
@ -405,7 +419,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun send(message: Message, target: MessageRecipients) {
|
override fun send(message: Message, target: MessageRecipients, retryId: Long?) {
|
||||||
// We have to perform sending on a different thread pool, since using the same pool for messaging and
|
// We have to perform sending on a different thread pool, since using the same pool for messaging and
|
||||||
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
|
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
|
||||||
messagingExecutor.fetchFrom {
|
messagingExecutor.fetchFrom {
|
||||||
@ -431,10 +445,55 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
"sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}"
|
"sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}"
|
||||||
}
|
}
|
||||||
producer!!.send(mqAddress, artemisMessage)
|
producer!!.send(mqAddress, artemisMessage)
|
||||||
|
|
||||||
|
retryId?.let {
|
||||||
|
database.transaction {
|
||||||
|
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
|
||||||
|
}
|
||||||
|
scheduledMessageRedeliveries[it] = messagingExecutor.schedule({
|
||||||
|
sendWithRetry(0, mqAddress, artemisMessage, it)
|
||||||
|
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun sendWithRetry(retryCount: Int, address: String, message: ClientMessage, retryId: Long) {
|
||||||
|
fun randomiseDuplicateId(message: ClientMessage) {
|
||||||
|
message.putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.trace { "Attempting to retry #$retryCount message delivery for $retryId" }
|
||||||
|
if (retryCount >= messageMaxRetryCount) {
|
||||||
|
log.warn("Reached the maximum number of retries ($messageMaxRetryCount) for message $message redelivery to $address")
|
||||||
|
scheduledMessageRedeliveries.remove(retryId)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
randomiseDuplicateId(message)
|
||||||
|
|
||||||
|
state.locked {
|
||||||
|
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
|
||||||
|
producer!!.send(address, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({
|
||||||
|
sendWithRetry(retryCount + 1, address, message, retryId)
|
||||||
|
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun cancelRedelivery(retryId: Long) {
|
||||||
|
database.transaction {
|
||||||
|
messagesToRedeliver.remove(retryId)
|
||||||
|
}
|
||||||
|
scheduledMessageRedeliveries[retryId]?.let {
|
||||||
|
log.trace { "Cancelling message redelivery for retry id $retryId" }
|
||||||
|
if (!it.isDone) it.cancel(true)
|
||||||
|
scheduledMessageRedeliveries.remove(retryId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun getMQAddress(target: MessageRecipients): String {
|
private fun getMQAddress(target: MessageRecipients): String {
|
||||||
return if (target == myAddress) {
|
return if (target == myAddress) {
|
||||||
// If we are sending to ourselves then route the message directly to our P2P queue.
|
// If we are sending to ourselves then route the message directly to our P2P queue.
|
||||||
|
@ -6,11 +6,17 @@ import net.corda.node.services.statemachine.FlowSessionState.Initiated
|
|||||||
import net.corda.node.services.statemachine.FlowSessionState.Initiating
|
import net.corda.node.services.statemachine.FlowSessionState.Initiating
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param retryable Indicates that the session initialisation should be retried until an expected [SessionData] response
|
||||||
|
* is received. Note that this requires the party on the other end to be a distributed service and run an idempotent flow
|
||||||
|
* that only sends back a single [SessionData] message before termination.
|
||||||
|
*/
|
||||||
class FlowSession(
|
class FlowSession(
|
||||||
val flow: FlowLogic<*>,
|
val flow: FlowLogic<*>,
|
||||||
val ourSessionId: Long,
|
val ourSessionId: Long,
|
||||||
val initiatingParty: Party?,
|
val initiatingParty: Party?,
|
||||||
var state: FlowSessionState) {
|
var state: FlowSessionState,
|
||||||
|
val retryable: Boolean = false) {
|
||||||
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<*>>()
|
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<*>>()
|
||||||
val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*>
|
val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*>
|
||||||
|
|
||||||
|
@ -174,11 +174,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
override fun <T : Any> sendAndReceive(receiveType: Class<T>,
|
override fun <T : Any> sendAndReceive(receiveType: Class<T>,
|
||||||
otherParty: Party,
|
otherParty: Party,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
sessionFlow: FlowLogic<*>,
|
||||||
|
retrySend: Boolean): UntrustworthyData<T> {
|
||||||
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
|
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
|
||||||
val session = getConfirmedSession(otherParty, sessionFlow)
|
val session = getConfirmedSession(otherParty, sessionFlow)
|
||||||
val sessionData = if (session == null) {
|
val sessionData = if (session == null) {
|
||||||
val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true)
|
val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend)
|
||||||
// Only do a receive here as the session init has carried the payload
|
// Only do a receive here as the session init has carried the payload
|
||||||
receiveInternal<SessionData>(newSession, receiveType)
|
receiveInternal<SessionData>(newSession, receiveType)
|
||||||
} else {
|
} else {
|
||||||
@ -292,9 +293,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
|
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
|
||||||
*/
|
*/
|
||||||
@Suspendable
|
@Suspendable
|
||||||
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession {
|
private fun startNewSession(otherParty: Party,
|
||||||
|
sessionFlow: FlowLogic<*>,
|
||||||
|
firstPayload: Any?,
|
||||||
|
waitForConfirmation: Boolean,
|
||||||
|
retryable: Boolean = false): FlowSession {
|
||||||
logger.trace { "Initiating a new session with $otherParty" }
|
logger.trace { "Initiating a new session with $otherParty" }
|
||||||
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty))
|
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
|
||||||
openSessions[Pair(sessionFlow, otherParty)] = session
|
openSessions[Pair(sessionFlow, otherParty)] = session
|
||||||
// We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class
|
// We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class
|
||||||
val clientFlowClass = sessionFlow.topConcreteFlowClass
|
val clientFlowClass = sessionFlow.topConcreteFlowClass
|
||||||
|
@ -297,6 +297,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
val session = openSessions[message.recipientSessionId]
|
val session = openSessions[message.recipientSessionId]
|
||||||
if (session != null) {
|
if (session != null) {
|
||||||
session.fiber.logger.trace { "Received $message on $session from $sender" }
|
session.fiber.logger.trace { "Received $message on $session from $sender" }
|
||||||
|
if (session.retryable) {
|
||||||
|
if (message is SessionConfirm && session.state is FlowSessionState.Initiated) {
|
||||||
|
session.fiber.logger.trace { "Ignoring duplicate confirmation for session ${session.ourSessionId} – session is idempotent" }
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (message !is SessionConfirm) {
|
||||||
|
serviceHub.networkService.cancelRedelivery(session.ourSessionId)
|
||||||
|
}
|
||||||
|
}
|
||||||
if (message is SessionEnd) {
|
if (message is SessionEnd) {
|
||||||
openSessions.remove(message.recipientSessionId)
|
openSessions.remove(message.recipientSessionId)
|
||||||
}
|
}
|
||||||
@ -319,7 +328,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
logger.trace { "Ignoring session end message for already closed session: $message" }
|
logger.trace { "Ignoring session end message for already closed session: $message" }
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Received a session message for unknown session: $message")
|
logger.warn("Received a session message for unknown session: $message, from $sender")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -523,42 +532,52 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
|
|
||||||
private fun processIORequest(ioRequest: FlowIORequest) {
|
private fun processIORequest(ioRequest: FlowIORequest) {
|
||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
if (ioRequest is SendRequest) {
|
when (ioRequest) {
|
||||||
if (ioRequest.message is SessionInit) {
|
is SendRequest -> processSendRequest(ioRequest)
|
||||||
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
|
is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun processSendRequest(ioRequest: SendRequest) {
|
||||||
|
val retryId = if (ioRequest.message is SessionInit) {
|
||||||
|
with(ioRequest.session) {
|
||||||
|
openSessions[ourSessionId] = this
|
||||||
|
if (retryable) ourSessionId else null
|
||||||
}
|
}
|
||||||
sendSessionMessage(ioRequest.session.state.sendToParty, ioRequest.message, ioRequest.session.fiber)
|
} else null
|
||||||
if (ioRequest !is ReceiveRequest<*>) {
|
sendSessionMessage(ioRequest.session.state.sendToParty, ioRequest.message, ioRequest.session.fiber, retryId)
|
||||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
if (ioRequest !is ReceiveRequest<*>) {
|
||||||
resumeFiber(ioRequest.session.fiber)
|
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||||
}
|
resumeFiber(ioRequest.session.fiber)
|
||||||
} else if (ioRequest is WaitForLedgerCommit) {
|
}
|
||||||
// Is it already committed?
|
}
|
||||||
val stx = database.transaction {
|
|
||||||
serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash)
|
private fun processWaitForCommitRequest(ioRequest: WaitForLedgerCommit) {
|
||||||
}
|
// Is it already committed?
|
||||||
if (stx != null) {
|
val stx = database.transaction {
|
||||||
resumeFiber(ioRequest.fiber)
|
serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash)
|
||||||
} else {
|
}
|
||||||
// No, then register to wait.
|
if (stx != null) {
|
||||||
//
|
resumeFiber(ioRequest.fiber)
|
||||||
// We assume this code runs on the server thread, which is the only place transactions are committed
|
} else {
|
||||||
// currently. When we liberalise our threading somewhat, handing of wait requests will need to be
|
// No, then register to wait.
|
||||||
// reworked to make the wait atomic in another way. Otherwise there is a race between checking the
|
//
|
||||||
// database and updating the waiting list.
|
// We assume this code runs on the server thread, which is the only place transactions are committed
|
||||||
mutex.locked {
|
// currently. When we liberalise our threading somewhat, handing of wait requests will need to be
|
||||||
fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber
|
// reworked to make the wait atomic in another way. Otherwise there is a race between checking the
|
||||||
}
|
// database and updating the waiting list.
|
||||||
|
mutex.locked {
|
||||||
|
fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null) {
|
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) {
|
||||||
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
|
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
|
||||||
?: throw IllegalArgumentException("Don't know about party $party")
|
?: throw IllegalArgumentException("Don't know about party $party")
|
||||||
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
|
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
|
||||||
val logger = fiber?.logger ?: logger
|
val logger = fiber?.logger ?: logger
|
||||||
logger.trace { "Sending $message to party $party @ $address" }
|
logger.trace { "Sending $message to party $party @ $address" + if (retryId != null) " with retry $retryId" else "" }
|
||||||
serviceHub.networkService.send(sessionTopic, message, address)
|
serviceHub.networkService.send(sessionTopic, message, address, retryId = retryId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ class InteractiveShellTest {
|
|||||||
fun party() = check("party: \"${someCorpLegalName}\"", someCorpLegalName.toString())
|
fun party() = check("party: \"${someCorpLegalName}\"", someCorpLegalName.toString())
|
||||||
|
|
||||||
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> {
|
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> {
|
||||||
override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData<T> {
|
||||||
throw UnsupportedOperationException("not implemented")
|
throw UnsupportedOperationException("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,8 +169,8 @@ data class TestNodeConfiguration(
|
|||||||
override val devMode: Boolean = true,
|
override val devMode: Boolean = true,
|
||||||
override val certificateSigningService: URL = URL("http://localhost"),
|
override val certificateSigningService: URL = URL("http://localhost"),
|
||||||
override val certificateChainCheckPolicies: List<CertChainPolicyConfig> = emptyList(),
|
override val certificateChainCheckPolicies: List<CertChainPolicyConfig> = emptyList(),
|
||||||
override val verifierType: VerifierType = VerifierType.InMemory) : NodeConfiguration {
|
override val verifierType: VerifierType = VerifierType.InMemory,
|
||||||
}
|
override val messageRedeliveryDelaySeconds: Int = 5) : NodeConfiguration
|
||||||
|
|
||||||
fun testConfiguration(baseDirectory: Path, legalName: String, basePort: Int): FullNodeConfiguration {
|
fun testConfiguration(baseDirectory: Path, legalName: String, basePort: Int): FullNodeConfiguration {
|
||||||
return FullNodeConfiguration(
|
return FullNodeConfiguration(
|
||||||
|
@ -359,7 +359,7 @@ class InMemoryMessagingNetwork(
|
|||||||
state.locked { check(handlers.remove(registration as Handler)) }
|
state.locked { check(handlers.remove(registration as Handler)) }
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun send(message: Message, target: MessageRecipients) {
|
override fun send(message: Message, target: MessageRecipients, retryId: Long?) {
|
||||||
check(running)
|
check(running)
|
||||||
msgSend(this, message, target)
|
msgSend(this, message, target)
|
||||||
if (!sendManuallyPumped) {
|
if (!sendManuallyPumped) {
|
||||||
@ -376,6 +376,8 @@ class InMemoryMessagingNetwork(
|
|||||||
netNodeHasShutdown(peerHandle)
|
netNodeHasShutdown(peerHandle)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun cancelRedelivery(retryId: Long) {}
|
||||||
|
|
||||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||||
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
|
||||||
return InMemoryMessage(topicSession, data, uuid)
|
return InMemoryMessage(topicSession, data, uuid)
|
||||||
|
Loading…
Reference in New Issue
Block a user