diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index c4a6d96ca4..ae79387908 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -78,6 +78,23 @@ abstract class FlowLogic { return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow) } + /** @see sendAndReceiveWithRetry */ + internal inline fun sendAndReceiveWithRetry(otherParty: Party, payload: Any) = sendAndReceiveWithRetry(R::class.java, otherParty, payload) + + /** + * Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received. + * + * Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead. + * It is only intended for the case where the [otherParty] is running a distributed service with an idempotent + * flow which only accepts a single request and sends back a single response – e.g. a notary or certain types of + * oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered + * to a different one, so there is no need to wait until the initial node comes back up to obtain a response. + */ + @Suspendable + internal open fun sendAndReceiveWithRetry(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { + return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow, true) + } + /** * Suspends until the specified [otherParty] sends us a message of type [R]. * diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index 171442be2c..9c9d07b1fe 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -49,7 +49,8 @@ interface FlowStateMachine { fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any, - sessionFlow: FlowLogic<*>): UntrustworthyData + sessionFlow: FlowLogic<*>, + retrySend: Boolean = false): UntrustworthyData @Suspendable fun receive(receiveType: Class, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index 7c74a8d618..97a393e00e 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -61,7 +61,7 @@ object NotaryFlow { } val response = try { - sendAndReceive>(notaryParty, payload) + sendAndReceiveWithRetry>(notaryParty, payload) } catch (e: NotaryException) { if (e.error is NotaryError.Conflict) { e.error.conflict.verified() diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 9ee1c38a8d..221b334f84 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -12,9 +12,7 @@ import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.* import net.corda.node.internal.Node -import net.corda.node.services.messaging.ServiceRequestMessage -import net.corda.node.services.messaging.createMessage -import net.corda.node.services.messaging.sendRequest +import net.corda.node.services.messaging.* import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.utilities.ServiceIdentityGenerator @@ -23,6 +21,10 @@ import net.corda.testing.node.NodeBasedTest import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger class P2PMessagingTest : NodeBasedTest() { @Test @@ -87,6 +89,100 @@ class P2PMessagingTest : NodeBasedTest() { assertAllNodesAreUsed(distributedService, serviceName, distributedService[0]) } + @Test + fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() { + val distributedServiceNodes = startNotaryCluster("DistributedService", 2).getOrThrow() + val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() + val serviceAddress = alice.services.networkMapCache.run { + alice.net.getAddressOfParty(getPartyInfo(getAnyNotary()!!)!!) + } + + val dummyTopic = "dummy.topic" + val responseMessage = "response" + + simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + + // Send a single request with retry + val response = with(alice.net) { + val request = TestRequest(replyTo = myAddress) + val responseFuture = onNext(dummyTopic, request.sessionID) + val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) + send(msg, serviceAddress, retryId = request.sessionID) + responseFuture + }.getOrThrow(10.seconds) + + assertThat(response).isEqualTo(responseMessage) + } + + @Test + fun `distributed service request retries are persisted across client node restarts`() { + val distributedServiceNodes = startNotaryCluster("DistributedService", 2).getOrThrow() + val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() + val serviceAddress = alice.services.networkMapCache.run { + alice.net.getAddressOfParty(getPartyInfo(getAnyNotary()!!)!!) + } + + val dummyTopic = "dummy.topic" + val responseMessage = "response" + + val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + + val sessionId = random63BitValue() + + // Send a single request with retry + with(alice.net) { + val request = TestRequest(sessionId, myAddress) + val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) + send(msg, serviceAddress, retryId = request.sessionID) + } + + // Wait until the first request is received + firstRequestReceived.await(5, TimeUnit.SECONDS) + // Stop alice's node before the request is redelivered – the first request is ignored + alice.stop() + assertThat(requestsReceived.get()).isEqualTo(1) + + // Restart the node and expect a response + val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() + val response = aliceRestarted.net.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) + + assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) + assertThat(response).isEqualTo(responseMessage) + } + + /** + * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to + * receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives + * a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests. + */ + private fun simulateCrashingNode(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): Pair { + val firstToReceive = AtomicBoolean(true) + val requestsReceived = AtomicInteger(0) + val firstRequestReceived = CountDownLatch(1) + distributedServiceNodes.forEach { + val nodeName = it.info.legalIdentity.name + var ignoreRequests = false + it.net.addMessageHandler(dummyTopic, DEFAULT_SESSION_ID) { netMessage, _ -> + requestsReceived.incrementAndGet() + firstRequestReceived.countDown() + // The node which receives the first request will ignore all requests + if (firstToReceive.getAndSet(false)) ignoreRequests = true + print("$nodeName: Received request - ") + if (ignoreRequests) { + println("ignoring") + // Requests are ignored to simulate a service node crashing before sending back a response. + // A retry by the client will result in the message being redelivered to another node in the service cluster. + } else { + println("sending response") + val request = netMessage.data.deserialize() + val response = it.net.createMessage(dummyTopic, request.sessionID, responseMessage.serialize().bytes) + it.net.send(response, request.replyTo) + } + } + } + return Pair(firstRequestReceived, requestsReceived) + } + private fun assertAllNodesAreUsed(participatingServiceNodes: List, serviceName: String, originatingNode: Node) { // Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used participatingServiceNodes.forEach { node -> diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index dae48ff4ec..d5156309e0 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -32,6 +32,7 @@ interface NodeConfiguration : SSLConfiguration { val certificateSigningService: URL val certificateChainCheckPolicies: List val verifierType: VerifierType + val messageRedeliveryDelaySeconds: Int } data class FullNodeConfiguration( @@ -51,6 +52,7 @@ data class FullNodeConfiguration( override val minimumPlatformVersion: Int = 1, override val rpcUsers: List, override val verifierType: VerifierType, + override val messageRedeliveryDelaySeconds: Int = 30, val useHTTPS: Boolean, @OldConfig("artemisAddress") val p2pAddress: HostAndPort, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 8699fc0257..d9003681d9 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -74,8 +74,14 @@ interface MessagingService { * * There is no way to know if a message has been received. If your flow requires this, you need the recipient * to send an ACK message back. + * + * @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id. + * Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary. */ - fun send(message: Message, target: MessageRecipients) + fun send(message: Message, target: MessageRecipients, retryId: Long? = null) + + /** Cancels the scheduled message redelivery for the specified [retryId] */ + fun cancelRedelivery(retryId: Long) /** * Returns an initialised [Message] with the current time, etc, already filled in. @@ -158,8 +164,8 @@ fun MessagingService.onNext(topic: String, sessionId: Long): Listenabl fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) = send(TopicSession(topic, sessionID), payload, to, uuid) -fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) - = send(createMessage(topicSession, payload.serialize().bytes, uuid), to) +fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null) + = send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId) /** * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 92ae5b644a..f2db4f1a40 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -43,9 +43,7 @@ import org.jetbrains.exposed.sql.statements.InsertStatement import java.security.PublicKey import java.time.Instant import java.util.* -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit +import java.util.concurrent.* import javax.annotation.concurrent.ThreadSafe // TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox @@ -92,6 +90,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, private val platformVersionProperty = SimpleString("platform-version") private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt() private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}" + + private val messageMaxRetryCount: Int = 3 } private class InnerState { @@ -108,6 +108,12 @@ class NodeMessagingClient(override val config: NodeConfiguration, var verificationResponseConsumer: ClientConsumer? = null } + val messagesToRedeliver = database.transaction { + JDBCHashMap>("${NODE_DATABASE_PREFIX}message_retry", true) + } + + val scheduledMessageRedeliveries = ConcurrentHashMap>() + val verifierService = when (config.verifierType) { VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) VerifierType.OutOfProcess -> createOutOfProcessVerifierService() @@ -201,6 +207,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS) } } + + resumeMessageRedelivery() } /** @@ -217,6 +225,12 @@ class NodeMessagingClient(override val config: NodeConfiguration, session.createConsumer(P2P_QUEUE) } + private fun resumeMessageRedelivery() { + messagesToRedeliver.forEach { + retryId, (message, target) -> send(message, target, retryId) + } + } + private var shutdownLatch = CountDownLatch(1) private fun processMessage(consumer: ClientConsumer): Boolean { @@ -405,7 +419,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - override fun send(message: Message, target: MessageRecipients) { + override fun send(message: Message, target: MessageRecipients, retryId: Long?) { // We have to perform sending on a different thread pool, since using the same pool for messaging and // fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals. messagingExecutor.fetchFrom { @@ -431,10 +445,55 @@ class NodeMessagingClient(override val config: NodeConfiguration, "sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}" } producer!!.send(mqAddress, artemisMessage) + + retryId?.let { + database.transaction { + messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) }) + } + scheduledMessageRedeliveries[it] = messagingExecutor.schedule({ + sendWithRetry(0, mqAddress, artemisMessage, it) + }, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS) + + } } } } + private fun sendWithRetry(retryCount: Int, address: String, message: ClientMessage, retryId: Long) { + fun randomiseDuplicateId(message: ClientMessage) { + message.putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + + log.trace { "Attempting to retry #$retryCount message delivery for $retryId" } + if (retryCount >= messageMaxRetryCount) { + log.warn("Reached the maximum number of retries ($messageMaxRetryCount) for message $message redelivery to $address") + scheduledMessageRedeliveries.remove(retryId) + return + } + + randomiseDuplicateId(message) + + state.locked { + log.trace { "Retry #$retryCount sending message $message to $address for $retryId" } + producer!!.send(address, message) + } + + scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({ + sendWithRetry(retryCount + 1, address, message, retryId) + }, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS) + } + + override fun cancelRedelivery(retryId: Long) { + database.transaction { + messagesToRedeliver.remove(retryId) + } + scheduledMessageRedeliveries[retryId]?.let { + log.trace { "Cancelling message redelivery for retry id $retryId" } + if (!it.isDone) it.cancel(true) + scheduledMessageRedeliveries.remove(retryId) + } + } + private fun getMQAddress(target: MessageRecipients): String { return if (target == myAddress) { // If we are sending to ourselves then route the message directly to our P2P queue. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt index b0ee2906d3..b05b8a0f4f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt @@ -6,11 +6,17 @@ import net.corda.node.services.statemachine.FlowSessionState.Initiated import net.corda.node.services.statemachine.FlowSessionState.Initiating import java.util.concurrent.ConcurrentLinkedQueue +/** + * @param retryable Indicates that the session initialisation should be retried until an expected [SessionData] response + * is received. Note that this requires the party on the other end to be a distributed service and run an idempotent flow + * that only sends back a single [SessionData] message before termination. + */ class FlowSession( val flow: FlowLogic<*>, val ourSessionId: Long, val initiatingParty: Party?, - var state: FlowSessionState) { + var state: FlowSessionState, + val retryable: Boolean = false) { val receivedMessages = ConcurrentLinkedQueue>() val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 081e239e77..d7d6073766 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -174,11 +174,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, override fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any, - sessionFlow: FlowLogic<*>): UntrustworthyData { + sessionFlow: FlowLogic<*>, + retrySend: Boolean): UntrustworthyData { logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." } val session = getConfirmedSession(otherParty, sessionFlow) val sessionData = if (session == null) { - val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true) + val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend) // Only do a receive here as the session init has carried the payload receiveInternal(newSession, receiveType) } else { @@ -292,9 +293,13 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * multiple public keys, but we **don't support multiple nodes advertising the same legal identity**. */ @Suspendable - private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession { + private fun startNewSession(otherParty: Party, + sessionFlow: FlowLogic<*>, + firstPayload: Any?, + waitForConfirmation: Boolean, + retryable: Boolean = false): FlowSession { logger.trace { "Initiating a new session with $otherParty" } - val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty)) + val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable) openSessions[Pair(sessionFlow, otherParty)] = session // We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class val clientFlowClass = sessionFlow.topConcreteFlowClass diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index a43228337c..c01f905f31 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -297,6 +297,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val session = openSessions[message.recipientSessionId] if (session != null) { session.fiber.logger.trace { "Received $message on $session from $sender" } + if (session.retryable) { + if (message is SessionConfirm && session.state is FlowSessionState.Initiated) { + session.fiber.logger.trace { "Ignoring duplicate confirmation for session ${session.ourSessionId} – session is idempotent" } + return + } + if (message !is SessionConfirm) { + serviceHub.networkService.cancelRedelivery(session.ourSessionId) + } + } if (message is SessionEnd) { openSessions.remove(message.recipientSessionId) } @@ -319,7 +328,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, logger.trace { "Ignoring session end message for already closed session: $message" } } } else { - logger.warn("Received a session message for unknown session: $message") + logger.warn("Received a session message for unknown session: $message, from $sender") } } } @@ -523,42 +532,52 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, private fun processIORequest(ioRequest: FlowIORequest) { executor.checkOnThread() - if (ioRequest is SendRequest) { - if (ioRequest.message is SessionInit) { - openSessions[ioRequest.session.ourSessionId] = ioRequest.session + when (ioRequest) { + is SendRequest -> processSendRequest(ioRequest) + is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest) + } + } + + private fun processSendRequest(ioRequest: SendRequest) { + val retryId = if (ioRequest.message is SessionInit) { + with(ioRequest.session) { + openSessions[ourSessionId] = this + if (retryable) ourSessionId else null } - sendSessionMessage(ioRequest.session.state.sendToParty, ioRequest.message, ioRequest.session.fiber) - if (ioRequest !is ReceiveRequest<*>) { - // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. - resumeFiber(ioRequest.session.fiber) - } - } else if (ioRequest is WaitForLedgerCommit) { - // Is it already committed? - val stx = database.transaction { - serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash) - } - if (stx != null) { - resumeFiber(ioRequest.fiber) - } else { - // No, then register to wait. - // - // We assume this code runs on the server thread, which is the only place transactions are committed - // currently. When we liberalise our threading somewhat, handing of wait requests will need to be - // reworked to make the wait atomic in another way. Otherwise there is a race between checking the - // database and updating the waiting list. - mutex.locked { - fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber - } + } else null + sendSessionMessage(ioRequest.session.state.sendToParty, ioRequest.message, ioRequest.session.fiber, retryId) + if (ioRequest !is ReceiveRequest<*>) { + // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. + resumeFiber(ioRequest.session.fiber) + } + } + + private fun processWaitForCommitRequest(ioRequest: WaitForLedgerCommit) { + // Is it already committed? + val stx = database.transaction { + serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash) + } + if (stx != null) { + resumeFiber(ioRequest.fiber) + } else { + // No, then register to wait. + // + // We assume this code runs on the server thread, which is the only place transactions are committed + // currently. When we liberalise our threading somewhat, handing of wait requests will need to be + // reworked to make the wait atomic in another way. Otherwise there is a race between checking the + // database and updating the waiting list. + mutex.locked { + fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber } } } - private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null) { + private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) { val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about party $party") val address = serviceHub.networkService.getAddressOfParty(partyInfo) val logger = fiber?.logger ?: logger - logger.trace { "Sending $message to party $party @ $address" } - serviceHub.networkService.send(sessionTopic, message, address) + logger.trace { "Sending $message to party $party @ $address" + if (retryId != null) " with retry $retryId" else "" } + serviceHub.networkService.send(sessionTopic, message, address, retryId = retryId) } } diff --git a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt index dad99767ac..29a636e215 100644 --- a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt +++ b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt @@ -72,7 +72,7 @@ class InteractiveShellTest { fun party() = check("party: \"${someCorpLegalName}\"", someCorpLegalName.toString()) class DummyFSM(val logic: FlowA) : FlowStateMachine { - override fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>): UntrustworthyData { + override fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData { throw UnsupportedOperationException("not implemented") } diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 3d43354846..6df145eac2 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -169,8 +169,8 @@ data class TestNodeConfiguration( override val devMode: Boolean = true, override val certificateSigningService: URL = URL("http://localhost"), override val certificateChainCheckPolicies: List = emptyList(), - override val verifierType: VerifierType = VerifierType.InMemory) : NodeConfiguration { -} + override val verifierType: VerifierType = VerifierType.InMemory, + override val messageRedeliveryDelaySeconds: Int = 5) : NodeConfiguration fun testConfiguration(baseDirectory: Path, legalName: String, basePort: Int): FullNodeConfiguration { return FullNodeConfiguration( diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 59a76536eb..b67c51d47b 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -359,7 +359,7 @@ class InMemoryMessagingNetwork( state.locked { check(handlers.remove(registration as Handler)) } } - override fun send(message: Message, target: MessageRecipients) { + override fun send(message: Message, target: MessageRecipients, retryId: Long?) { check(running) msgSend(this, message, target) if (!sendManuallyPumped) { @@ -376,6 +376,8 @@ class InMemoryMessagingNetwork( netNodeHasShutdown(peerHandle) } + override fun cancelRedelivery(retryId: Long) {} + /** Returns the given (topic & session, data) pair as a newly created message object. */ override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message { return InMemoryMessage(topicSession, data, uuid)