diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 87874e2381..2b5b9d7bda 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2225,7 +2225,7 @@ public final class net.corda.core.flows.NotaryFlow extends java.lang.Object ## @DoNotImplement @InitiatingFlow -public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic +public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic implements net.corda.core.internal.TimedFlow public (net.corda.core.transactions.SignedTransaction) public (net.corda.core.transactions.SignedTransaction, net.corda.core.utilities.ProgressTracker) @Suspendable diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index be86932ec9..babcc728fd 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -17,6 +17,7 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.TransactionSignature import net.corda.core.identity.Party import net.corda.core.internal.FetchDataFlow +import net.corda.core.internal.TimedFlow import net.corda.core.internal.notary.generateSignature import net.corda.core.internal.notary.validateSignatures import net.corda.core.internal.pushToLoggingContext @@ -41,8 +42,10 @@ class NotaryFlow { */ @DoNotImplement @InitiatingFlow - open class Client(private val stx: SignedTransaction, - override val progressTracker: ProgressTracker) : FlowLogic>() { + open class Client( + private val stx: SignedTransaction, + override val progressTracker: ProgressTracker + ) : FlowLogic>(), TimedFlow { constructor(stx: SignedTransaction) : this(stx, tracker()) companion object { diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt index 620a649712..e770327c02 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt @@ -29,13 +29,9 @@ sealed class FlowIORequest { * @property shouldRetrySend specifies whether the send should be retried. */ data class Send( - val sessionToMessage: Map>, - val shouldRetrySend: Boolean + val sessionToMessage: Map> ) : FlowIORequest() { - override fun toString() = "Send(" + - "sessionToMessage=${sessionToMessage.mapValues { it.value.hash }}, " + - "shouldRetrySend=$shouldRetrySend" + - ")" + override fun toString() = "Send(sessionToMessage=${sessionToMessage.mapValues { it.value.hash }})" } /** diff --git a/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt new file mode 100644 index 0000000000..22d7f1c681 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt @@ -0,0 +1,23 @@ +package net.corda.core.internal + +/** + * A marker for a flow that will return the same result if replayed from the beginning. Any side effects the flow causes + * must also be idempotent. + * + * Flow idempotency allows skipping persisting checkpoints, allowing better performance. + */ +interface IdempotentFlow + +/** + * An idempotent flow that needs to be replayed if it does not complete within a certain timeout. + * + * Example use would be the notary client flow: if the client sends a request to an HA notary cluster, it will get + * accepted by one of the cluster members, but the member might crash before returning a response. The client flow + * would be stuck waiting for that member to come back up. Retrying the notary flow will re-send the request to the + * next available notary cluster member. + * + * Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints + * persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow]. + */ +// TODO: allow specifying retry settings per flow +interface TimedFlow : IdempotentFlow \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt new file mode 100644 index 0000000000..180d2024af --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -0,0 +1,189 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.* +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.ResolveTransactionsFlow +import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.node.AppServiceHub +import net.corda.core.node.NotaryInfo +import net.corda.core.node.services.CordaService +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.seconds +import net.corda.node.internal.StartedNode +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.NotaryConfig +import net.corda.node.services.config.P2PMessagingRetryConfiguration +import net.corda.nodeapi.internal.DevIdentityGenerator +import net.corda.nodeapi.internal.network.NetworkParametersCopier +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.dummyCommand +import net.corda.testing.core.singleIdentity +import net.corda.testing.internal.LogHelper +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetworkParameters +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.startFlow +import org.junit.AfterClass +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.slf4j.MDC +import java.security.PublicKey +import java.util.concurrent.atomic.AtomicInteger + +class TimedFlowTests { + companion object { + /** The notary nodes don't run any consensus protocol, so 2 nodes are sufficient for the purpose of this test. */ + private const val CLUSTER_SIZE = 2 + /** A shared counter across all notary service nodes. */ + var requestsReceived: AtomicInteger = AtomicInteger(0) + + private lateinit var mockNet: InternalMockNetwork + private lateinit var notary: Party + private lateinit var node: StartedNode + + init { + LogHelper.setLevel("+net.corda.flow", "+net.corda.testing.node", "+net.corda.node.services.messaging") + } + + @BeforeClass + @JvmStatic + fun setup() { + mockNet = InternalMockNetwork( + listOf("net.corda.testing.contracts", "net.corda.node.services"), + MockNetworkParameters().withServicePeerAllocationStrategy(InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()), + threadPerNode = true + ) + val started = startClusterAndNode(mockNet) + notary = started.first + node = started.second + } + + @AfterClass + @JvmStatic + fun stopNodes() { + mockNet.stopNodes() + } + + private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair> { + val replicaIds = (0 until CLUSTER_SIZE) + val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( + replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, + CordaX500Name("Custom Notary", "Zurich", "CH")) + + val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true)))) + val notaryConfig = mock { + whenever(it.custom).thenReturn(true) + whenever(it.isClusterConfig).thenReturn(true) + whenever(it.validating).thenReturn(true) + } + + val notaryNodes = (0 until CLUSTER_SIZE).map { + mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = { + doReturn(notaryConfig).whenever(it).notary + })) + } + + val aliceNode = mockNet.createUnstartedNode( + InternalMockNodeParameters( + legalName = CordaX500Name("Alice", "AliceCorp", "GB"), + configOverrides = { conf: NodeConfiguration -> + val retryConfig = P2PMessagingRetryConfiguration(1.seconds, 3, 1.0) + doReturn(retryConfig).whenever(conf).p2pMessagingRetry + } + ) + ) + + // MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the + // network-parameters in their directories before they're started. + val node = (notaryNodes + aliceNode).map { node -> + networkParameters.install(mockNet.baseDirectory(node.id)) + node.start() + }.last() + + return Pair(notaryIdentity, node) + } + } + + @Before + fun resetCounter() { + requestsReceived = AtomicInteger(0) + } + + @Test + fun `timed flows are restarted`() { + node.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val flow = NotaryFlow.Client(issueTx) + val notarySignatures = services.startFlow(flow).resultFuture.get() + (issueTx + notarySignatures).verifyRequiredSignatures() + } + } + + @Test + fun `timed sub-flows are restarted`() { + node.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val flow = FinalityFlow(issueTx) + val stx = services.startFlow(flow).resultFuture.get() + stx.verifyRequiredSignatures() + } + } + + private fun StartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { + return services.signInitialTransaction( + TransactionBuilder(notary).apply { + addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey)) + block() + } + ) + } + + @CordaService + private class TestNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { + override val uniquenessProvider = mock() + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = TestNotaryFlow(otherPartySession, this) + override fun start() {} + override fun stop() {} + } + + /** A notary flow that will yield without returning a response on the very first received request. */ + private class TestNotaryFlow(otherSide: FlowSession, service: TestNotaryService) : NotaryServiceFlow(otherSide, service) { + @Suspendable + override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts { + val myIdentity = serviceHub.myInfo.legalIdentities.first() + MDC.put("name", myIdentity.name.toString()) + logger.info("Received a request from ${otherSideSession.counterparty.name}") + val stx = requestPayload.signedTransaction + subFlow(ResolveTransactionsFlow(stx, otherSideSession)) + + if (TimedFlowTests.requestsReceived.getAndIncrement() == 0) { + logger.info("Ignoring") + // Waiting forever + stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false) + } else { + logger.info("Processing") + } + return TransactionParts(stx.id, stx.inputs, stx.tx.timeWindow, stx.notary) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index ad1176bd15..7aa53e788c 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -12,9 +12,7 @@ package net.corda.services.messaging import net.corda.core.concurrent.CordaFuture import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.randomOrNull import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable @@ -33,7 +31,6 @@ import net.corda.testing.driver.driver import net.corda.testing.driver.internal.internalServices import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas -import net.corda.testing.internal.chooseIdentity import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.node.ClusterSpec import net.corda.testing.node.NotarySpec @@ -41,10 +38,7 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.ClassRule import org.junit.Test import java.util.* -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger class P2PMessagingTest : IntegrationTest() { private companion object { @@ -62,73 +56,6 @@ class P2PMessagingTest : IntegrationTest() { } } - - @Test - fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() { - startDriverWithDistributedService { distributedServiceNodes -> - val alice = startAlice() - val serviceAddress = alice.services.networkMapCache.run { - val notaryParty = notaryIdentities.randomOrNull()!! - alice.internalServices.networkService.getAddressOfParty(getPartyInfo(notaryParty)!!) - } - - val responseMessage = "response" - - val crashingNodes = simulateCrashingNodes(distributedServiceNodes, responseMessage) - - // Send a single request with retry - val responseFuture = alice.receiveFrom(serviceAddress, retryId = 0) - crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) - // The request wasn't successful. - assertThat(responseFuture.isDone).isFalse() - crashingNodes.ignoreRequests = false - - // The retry should be successful. - val response = responseFuture.getOrThrow(10.seconds) - assertThat(response).isEqualTo(responseMessage) - } - } - - @Test - fun `distributed service request retries are persisted across client node restarts`() { - startDriverWithDistributedService { distributedServiceNodes -> - val alice = startAlice() - val serviceAddress = alice.services.networkMapCache.run { - val notaryParty = notaryIdentities.randomOrNull()!! - alice.internalServices.networkService.getAddressOfParty(getPartyInfo(notaryParty)!!) - } - - val responseMessage = "response" - - val crashingNodes = simulateCrashingNodes(distributedServiceNodes, responseMessage) - - // Send a single request with retry - alice.receiveFrom(serviceAddress, retryId = 0) - - // Wait until the first request is received - crashingNodes.firstRequestReceived.await() - // Stop alice's node after we ensured that the first request was delivered and ignored. - alice.stop() - val numberOfRequestsReceived = crashingNodes.requestsReceived.get() - assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) - - crashingNodes.ignoreRequests = false - - // Restart the node and expect a response - val aliceRestarted = startAlice() - - val responseFuture = openFuture() - aliceRestarted.internalServices.networkService.runOnNextMessage("test.response") { - responseFuture.set(it.data.deserialize()) - } - val response = responseFuture.getOrThrow() - - assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) - assertThat(response).isEqualTo(responseMessage) - } - } - - private fun startDriverWithDistributedService(dsl: DriverDSL.(List) -> Unit) { driver(DriverParameters( startNodesInProcess = true, @@ -138,55 +65,6 @@ class P2PMessagingTest : IntegrationTest() { } } - private fun DriverDSL.startAlice(): InProcess { - return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf( - "messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3))) - .map { (it as InProcess) } - .getOrThrow() - } - - data class CrashingNodes( - val firstRequestReceived: CountDownLatch, - val requestsReceived: AtomicInteger, - var ignoreRequests: Boolean - ) - - /** - * Sets up the [distributedServiceNodes] to respond to "test.request" requests. All nodes will receive requests and - * either ignore them or respond to "test.response", depending on the value of [CrashingNodes.ignoreRequests], - * initially set to true. This may be used to simulate scenarios where nodes receive request messages but crash - * before sending back a response. - */ - private fun simulateCrashingNodes(distributedServiceNodes: List, responseMessage: String): CrashingNodes { - val crashingNodes = CrashingNodes( - requestsReceived = AtomicInteger(0), - firstRequestReceived = CountDownLatch(1), - ignoreRequests = true - ) - - distributedServiceNodes.forEach { - val nodeName = it.services.myInfo.chooseIdentity().name - it.internalServices.networkService.addMessageHandler("test.request") { netMessage, _, handler -> - crashingNodes.requestsReceived.incrementAndGet() - crashingNodes.firstRequestReceived.countDown() - // The node which receives the first request will ignore all requests - print("$nodeName: Received request - ") - if (crashingNodes.ignoreRequests) { - println("ignoring") - // Requests are ignored to simulate a service node crashing before sending back a response. - // A retry by the client will result in the message being redelivered to another node in the service cluster. - } else { - println("sending response") - val request = netMessage.data.deserialize() - val response = it.internalServices.networkService.createMessage("test.response", responseMessage.serialize().bytes) - it.internalServices.networkService.send(response, request.replyTo) - } - handler.afterDatabaseTransaction() - } - } - return crashingNodes - } - private fun assertAllNodesAreUsed(participatingServiceNodes: List, serviceName: CordaX500Name, originatingNode: InProcess) { // Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used participatingServiceNodes.forEach { node -> @@ -217,12 +95,12 @@ class P2PMessagingTest : IntegrationTest() { } } - private fun InProcess.receiveFrom(target: MessageRecipients, retryId: Long? = null): CordaFuture { + private fun InProcess.receiveFrom(target: MessageRecipients): CordaFuture { val response = openFuture() internalServices.networkService.runOnNextMessage("test.response") { netMessage -> response.set(netMessage.data.deserialize()) } - internalServices.networkService.send("test.request", TestRequest(replyTo = internalServices.networkService.myAddress), target, retryId = retryId) + internalServices.networkService.send("test.request", TestRequest(replyTo = internalServices.networkService.myAddress), target) return response } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index a0311c0912..e007f1848f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -74,8 +74,6 @@ interface MessagingService { * There is no way to know if a message has been received. If your flow requires this, you need the recipient * to send an ACK message back. * - * @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id. - * Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary. * @param sequenceKey an object that may be used to enable a parallel [MessagingService] implementation. Two * subsequent send()s with the same [sequenceKey] (up to equality) are guaranteed to be delivered in the same * sequence the send()s were called. By default this is chosen conservatively to be [target]. @@ -84,7 +82,6 @@ interface MessagingService { fun send( message: Message, target: MessageRecipients, - retryId: Long? = null, sequenceKey: Any = target ) @@ -92,7 +89,6 @@ interface MessagingService { data class AddressedMessage( val message: Message, val target: MessageRecipients, - val retryId: Long? = null, val sequenceKey: Any = target ) @@ -105,9 +101,6 @@ interface MessagingService { @Suspendable fun send(addressedMessages: List) - /** Cancels the scheduled message redelivery for the specified [retryId] */ - fun cancelRedelivery(retryId: Long) - /** * Returns an initialised [Message] with the current time, etc, already filled in. * @@ -125,7 +118,7 @@ interface MessagingService { val myAddress: SingleMessageRecipient } -fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), retryId: Long? = null, additionalHeaders: Map = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to, retryId) +fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), additionalHeaders: Map = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to) interface MessageHandlerRegistration diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 24492ed6c3..3261d52ed4 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -26,7 +26,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -37,43 +41,41 @@ import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.utilities.AffinityExecutor -import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.requireMessageSize import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.* -import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Observable import rx.Subscription import rx.subjects.PublishSubject -import java.io.Serializable import java.security.PublicKey import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit import javax.annotation.concurrent.ThreadSafe -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Id -import javax.persistence.Lob /** * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. @@ -119,35 +121,12 @@ class P2PMessagingClient(val config: NodeConfiguration, companion object { private val log = contextLogger() - fun createMessageToRedeliver(): PersistentMap, RetryMessage, Long> { - return PersistentMap( - toPersistentEntityKey = { it }, - fromPersistentEntity = { - Pair(it.key, - Pair(it.message.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), - it.recipients.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)) - ) - }, - toPersistentEntity = { _key: Long, (_message: Message, _recipient: MessageRecipients): Pair -> - RetryMessage().apply { - key = _key - message = _message.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes - recipients = _recipient.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes - } - }, - persistentEntityClass = RetryMessage::class.java - ) - } - class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val additionalHeaders: Map) : Message { override val debugTimestamp: Instant = Instant.now() override fun toString() = "$topic#${String(data.bytes)}" } } - private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount - private val backoffBase: Double = config.p2pMessagingRetry.backoffBase - private class InnerState { var started = false var running = false @@ -163,17 +142,12 @@ class P2PMessagingClient(val config: NodeConfiguration, fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) } - private val messagesToRedeliver = createMessageToRedeliver() - - private val scheduledMessageRedeliveries = ConcurrentHashMap>() - /** A registration to handle messages of different types */ data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) override val ourSenderUUID = UUID.randomUUID().toString() - private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds private val state = ThreadBox(InnerState()) private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap()) @@ -184,21 +158,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val deduplicator = P2PMessageDeduplicator(database) internal var messagingExecutor: MessagingExecutor? = null - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry") - class RetryMessage( - @Id - @Column(name = "message_id", length = 64, nullable = false) - var key: Long = 0, - - @Lob - @Column(nullable = false) - var message: ByteArray = EMPTY_BYTE_ARRAY, - @Lob - @Column(nullable = false) - var recipients: ByteArray = EMPTY_BYTE_ARRAY - ) : Serializable - fun start() { state.locked { started = true @@ -255,8 +214,6 @@ class P2PMessagingClient(val config: NodeConfiguration, registerBridgeControl(bridgeSession!!, inboxes.toList()) enumerateBridges(bridgeSession!!, inboxes.toList()) } - - resumeMessageRedelivery() } private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { @@ -355,12 +312,6 @@ class P2PMessagingClient(val config: NodeConfiguration, sendBridgeControl(startupMessage) } - private fun resumeMessageRedelivery() { - messagesToRedeliver.forEach { retryId, (message, target) -> - send(message, target, retryId) - } - } - private val shutdownLatch = CountDownLatch(1) /** @@ -534,53 +485,15 @@ class P2PMessagingClient(val config: NodeConfiguration, override fun close() = stop() @Suspendable - override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { + override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { requireMessageSize(message.data.size, maxMessageSize) messagingExecutor!!.send(message, target) - retryId?.let { - database.transaction { - messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) }) - } - scheduledMessageRedeliveries[it] = nodeExecutor.schedule({ - sendWithRetry(0, message, target, retryId) - }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) - } } @Suspendable override fun send(addressedMessages: List) { - for ((message, target, retryId, sequenceKey) in addressedMessages) { - send(message, target, retryId, sequenceKey) - } - } - - private fun sendWithRetry(retryCount: Int, message: Message, target: MessageRecipients, retryId: Long) { - log.trace { "Attempting to retry #$retryCount message delivery for $retryId" } - if (retryCount >= messageMaxRetryCount) { - log.warn("Reached the maximum number of retries ($messageMaxRetryCount) for message $message redelivery to $target") - scheduledMessageRedeliveries.remove(retryId) - return - } - - val messageWithRetryCount = object : Message by message { - override val uniqueMessageId = DeduplicationId("${message.uniqueMessageId.toString}-$retryCount") - } - - messagingExecutor!!.send(messageWithRetryCount, target) - - scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({ - sendWithRetry(retryCount + 1, message, target, retryId) - }, messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS) - } - - override fun cancelRedelivery(retryId: Long) { - database.transaction { - messagesToRedeliver.remove(retryId) - } - scheduledMessageRedeliveries[retryId]?.let { - log.trace { "Cancelling message redelivery for retry id $retryId" } - if (!it.isDone) it.cancel(true) - scheduledMessageRedeliveries.remove(retryId) + for ((message, target, sequenceKey) in addressedMessages) { + send(message, target, sequenceKey) } } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index cadc427d50..34f036631b 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -13,20 +13,23 @@ package net.corda.node.services.schema import net.corda.core.contracts.ContractState import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.LinearState -import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.core.schemas.CommonSchemaV1 import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.node.services.api.SchemaService import net.corda.node.services.api.SchemaService.SchemaOptions import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.P2PMessageDeduplicator -import net.corda.node.services.messaging.P2PMessagingClient -import net.corda.node.services.persistence.* +import net.corda.node.services.persistence.DBCheckpointStorage +import net.corda.node.services.persistence.DBTransactionMappingStorage +import net.corda.node.services.persistence.DBTransactionStorage +import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.RunOnceService import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider @@ -52,7 +55,6 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java, P2PMessageDeduplicator.ProcessedMessage::class.java, - P2PMessagingClient.RetryMessage::class.java, PersistentIdentityService.PersistentIdentity::class.java, PersistentIdentityService.PersistentIdentityNames::class.java, ContractUpgradeServiceImpl.DBContractUpgrade::class.java, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index 9c11c4bc34..a91a1d17c3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -145,6 +145,19 @@ sealed class Action { * Retry a flow from the last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details. */ data class RetryFlowFromSafePoint(val currentState: StateMachineState) : Action() + + /** + * Schedule the flow [flowId] to be retried if it does not complete within the timeout period specified in the configuration. + * + * Note that this only works with [TimedFlow]. + */ + data class ScheduleFlowTimeout(val flowId: StateMachineRunId) : Action() + + /** + * Cancel the retry timeout for flow [flowId]. This must be called when a timed flow completes to prevent + * unnecessary additional invocations. + */ + data class CancelFlowTimeout(val flowId: StateMachineRunId) : Action() } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 6f5f57ad5a..5ae8d3171c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -84,9 +84,10 @@ class ActionExecutorImpl( is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action) is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action) is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action) + is Action.ScheduleFlowTimeout -> scheduleFlowTimeout(action) + is Action.CancelFlowTimeout -> cancelFlowTimeout(action) } } - private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) { if (action.uuid != null) services.vaultService.softLockRelease(action.uuid) } @@ -244,4 +245,12 @@ class ActionExecutorImpl( private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes { return checkpoint.serialize(context = checkpointSerializationContext) } + + private fun cancelFlowTimeout(action: Action.CancelFlowTimeout) { + stateMachineManager.cancelFlowTimeout(action.flowId) + } + + private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) { + stateMachineManager.scheduleFlowTimeout(action.flowId) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/DeduplicationId.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/DeduplicationId.kt index a642321056..8ca4c51ae8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/DeduplicationId.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/DeduplicationId.kt @@ -28,16 +28,16 @@ data class DeduplicationId(val toString: String) { * creating IDs in case the message-generating flow logic is replayed on hard failure. * * A normal deduplication ID consists of: - * 1. A deduplication seed set per flow. This is either the flow's ID or in case of an initated flow the - * initiator's session ID. + * 1. A deduplication seed set per session. This is the initiator's session ID, with a prefix for initiator + * or initiated. * 2. The number of *clean* suspends since the start of the flow. * 3. An optional additional index, for cases where several messages are sent as part of the state transition. * Note that care must be taken with this index, it must be a deterministic counter. For example a naive * iteration over a HashMap will produce a different list of indeces than a previous run, causing the * message-id map to change, which means deduplication will not happen correctly. */ - fun createForNormal(checkpoint: Checkpoint, index: Int): DeduplicationId { - return DeduplicationId("N-${checkpoint.deduplicationSeed}-${checkpoint.numberOfSuspends}-$index") + fun createForNormal(checkpoint: Checkpoint, index: Int, session: SessionState): DeduplicationId { + return DeduplicationId("N-${session.deduplicationSeed}-${checkpoint.numberOfSuspends}-$index") } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 8bae2cc972..f47728211c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -83,8 +83,7 @@ class FlowSessionImpl( @Suspendable override fun send(payload: Any, maySkipCheckpoint: Boolean) { val request = FlowIORequest.Send( - sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)), - shouldRetrySend = false + sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)) ) return getFlowStateMachine().suspend(request, maySkipCheckpoint) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index e9da60308e..144ea62d26 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -148,7 +148,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val transitionExecutor = getTransientField(TransientValues::transitionExecutor) val eventQueue = getTransientField(TransientValues::eventQueue) try { - eventLoop@while (true) { + eventLoop@ while (true) { val nextEvent = eventQueue.receive() val continuation = processEvent(transitionExecutor, nextEvent) when (continuation) { @@ -336,11 +336,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, parkAndSerialize { _, _ -> logger.trace { "Suspended on $ioRequest" } + // Will skip checkpoint if there are any idempotent flows in the subflow stack. + val skipPersistingCheckpoint = containsIdempotentFlows() || maySkipCheckpoint + contextTransactionOrNull = transaction.value val event = try { Event.Suspend( ioRequest = ioRequest, - maySkipCheckpoint = maySkipCheckpoint, + maySkipCheckpoint = skipPersistingCheckpoint, fiber = this.serialize(context = serializationContext.value) ) } catch (throwable: Throwable) { @@ -364,6 +367,11 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, )) } + private fun containsIdempotentFlows(): Boolean { + val subFlowStack = snapshot().checkpoint.subFlowStack + return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) } + } + @Suspendable override fun scheduleEvent(event: Event) { getTransientField(TransientValues::eventQueue).send(event) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowTimeoutException.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowTimeoutException.kt new file mode 100644 index 0000000000..cb9ab23624 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowTimeoutException.kt @@ -0,0 +1,9 @@ +package net.corda.node.services.statemachine + +import net.corda.core.CordaException + +/** + * This exception is fired once the retry timeout of a [TimedFlow] expires. + * It will indicate to the flow hospital to restart the flow. + */ +data class FlowTimeoutException(val maxRetries: Int) : CordaException("replaying flow from the last checkpoint") \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 73c3490a60..3a6bdb8b0c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -18,25 +18,17 @@ import co.paralleluniverse.strands.channels.Channels import com.codahale.metrics.Gauge import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext -import net.corda.core.context.InvocationOrigin import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.ThreadBox -import net.corda.core.internal.bufferUntilSubscribed -import net.corda.core.internal.castIfPossible +import net.corda.core.internal.* import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.core.serialization.SerializedBytes -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize +import net.corda.core.serialization.* import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger @@ -49,11 +41,6 @@ import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion import net.corda.node.services.statemachine.interceptors.* -import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor -import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker -import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor -import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor -import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -65,11 +52,11 @@ import rx.Observable import rx.subjects.PublishSubject import java.security.SecureRandom import java.util.* -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ExecutorService +import java.util.concurrent.* import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.ThreadSafe import kotlin.collections.ArrayList +import kotlin.collections.HashMap import kotlin.concurrent.withLock import kotlin.streams.toList @@ -93,18 +80,28 @@ class SingleThreadedStateMachineManager( private class Flow(val fiber: FlowStateMachineImpl<*>, val resultFuture: OpenFuture) + private data class ScheduledTimeout( + /** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */ + val scheduledFuture: ScheduledFuture<*>, + /** Specifies the number of times this flow has been retried. */ + val retryCount: Int = 0 + ) + // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. private class InnerState { val changesPublisher = PublishSubject.create()!! - // True if we're shutting down, so don't resume anything. + /** True if we're shutting down, so don't resume anything. */ var stopping = false val flows = HashMap() val startedFutures = HashMap>() + /** Flows scheduled to be retried if not finished within the specified timeout period. */ + val timedFlows = HashMap() } private val mutex = ThreadBox(InnerState()) private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) + private val timeoutScheduler = Executors.newScheduledThreadPool(1) // How many Fibers are running and not suspended. If zero and stopping is true, then we are halted. private val liveFibers = ReusableLatch() // Monitoring support. @@ -219,8 +216,8 @@ class SingleThreadedStateMachineManager( } override fun killFlow(id: StateMachineRunId): Boolean { - return mutex.locked { + cancelTimeoutIfScheduled(id) val flow = flows.remove(id) if (flow != null) { logger.debug("Killing flow known to physical node.") @@ -272,6 +269,7 @@ class SingleThreadedStateMachineManager( override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) { mutex.locked { + cancelTimeoutIfScheduled(flowId) val flow = flows.remove(flowId) if (flow != null) { decrementLiveFibers() @@ -436,10 +434,11 @@ class SingleThreadedStateMachineManager( "unknown session $recipientId, discarding..." } } else { - throw IllegalArgumentException("Cannot find flow corresponding to session ID $recipientId") + logger.warn("Cannot find flow corresponding to session ID $recipientId.") } } else { - val flow = mutex.locked { flows[flowId] } ?: throw IllegalStateException("Cannot find fiber corresponding to ID $flowId") + val flow = mutex.locked { flows[flowId] } + ?: throw IllegalStateException("Cannot find fiber corresponding to ID $flowId") flow.fiber.scheduleEvent(Event.DeliverSessionMessage(sessionMessage, deduplicationHandler, sender)) } } catch (exception: Exception) { @@ -454,6 +453,7 @@ class SingleThreadedStateMachineManager( val payload = RejectSessionMessage(message, errorId) return ExistingSessionMessage(initiatorSessionId, payload) } + val replyError = try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) @@ -496,8 +496,8 @@ class SingleThreadedStateMachineManager( } catch (e: ClassCastException) { throw SessionRejectException("${message.initiatorFlowClassName} is not a flow") } - return serviceHub.getFlowFactory(initiatingFlowClass) ?: - throw SessionRejectException("$initiatingFlowClass is not registered") + return serviceHub.getFlowFactory(initiatingFlowClass) + ?: throw SessionRejectException("$initiatingFlowClass is not registered") } private fun startInitiatedFlow( @@ -542,7 +542,7 @@ class SingleThreadedStateMachineManager( flowLogic.stateMachine = flowStateMachineImpl val frozenFlowLogic = (flowLogic as FlowLogic<*>).serialize(context = checkpointSerializationContext!!) - val flowCorDappVersion= createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion) + val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion) val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed, flowCorDappVersion).getOrThrow() val startedFuture = openFuture() @@ -566,6 +566,59 @@ class SingleThreadedStateMachineManager( return startedFuture.map { flowStateMachineImpl as FlowStateMachine } } + override fun scheduleFlowTimeout(flowId: StateMachineRunId) { + mutex.locked { scheduleTimeout(flowId) } + } + + override fun cancelFlowTimeout(flowId: StateMachineRunId) { + mutex.locked { cancelTimeoutIfScheduled(flowId) } + } + + /** + * Schedules the flow [flowId] to be retried if it does not finish within the timeout period + * specified in the config. + * + * Assumes lock is taken on the [InnerState]. + */ + private fun InnerState.scheduleTimeout(flowId: StateMachineRunId) { + val flow = flows[flowId] + if (flow != null) { + val scheduledTimeout = timedFlows[flowId] + val retryCount = if (scheduledTimeout != null) { + val timeoutFuture = scheduledTimeout.scheduledFuture + if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) + scheduledTimeout.retryCount + } else 0 + val scheduledFuture = scheduleTimeoutException(flow, retryCount) + timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1) + } else { + logger.warn("Unable to schedule timeout for flow $flowId – flow not found.") + } + } + + /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ + private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { + return with(serviceHub.configuration.p2pMessagingRetry) { + val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + timeoutScheduler.schedule({ + val event = Event.Error(FlowTimeoutException(maxRetryCount)) + flow.fiber.scheduleEvent(event) + }, timeoutDelaySeconds, TimeUnit.SECONDS) + } + } + + /** + * Cancels any scheduled flow timeout for [flowId]. + * + * Assumes lock is taken on the [InnerState]. + */ + private fun InnerState.cancelTimeoutIfScheduled(flowId: StateMachineRunId) { + timedFlows[flowId]?.let { (future, _) -> + if (!future.isDone) future.cancel(true) + timedFlows.remove(flowId) + } + } + private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes): Checkpoint? { return try { serializedCheckpoint.deserialize(context = checkpointSerializationContext!!) @@ -673,6 +726,8 @@ class SingleThreadedStateMachineManager( } else { oldFlow.resultFuture.captureLater(flow.resultFuture) } + val flowLogic = flow.fiber.logic + if (flowLogic is TimedFlow) scheduleTimeout(id) flow.fiber.scheduleEvent(Event.DoRemainingWork) when (checkpoint.flowState) { is FlowState.Unstarted -> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index b0fb7943f0..3f66c15c4b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.TimedFlow import net.corda.core.utilities.loggerFor import java.sql.SQLException import java.time.Instant @@ -12,7 +13,7 @@ import java.util.concurrent.ConcurrentHashMap object StaffedFlowHospital : FlowHospital { private val log = loggerFor() - private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist) + private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout) private val patients = ConcurrentHashMap() @@ -124,4 +125,31 @@ object StaffedFlowHospital : FlowHospital { return exception != null && (exception is org.hibernate.exception.ConstraintViolationException || mentionsConstraintViolation(exception.cause)) } } + + /** + * Restarts [TimedFlow], keeping track of the number of retries and making sure it does not + * exceed the limit specified by the [FlowTimeoutException]. + */ + object DoctorTimeout : Staff { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + if (newError is FlowTimeoutException) { + if (isTimedFlow(flowFiber)) { + if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) { + return Diagnosis.DISCHARGE + } else { + log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}") + } + } else { + log.warn("\"Unable to restart flow: ${flowFiber.javaClass}, it is not timed and does not contain any timed sub-flows.") + } + } + return Diagnosis.NOT_MY_SPECIALTY + } + + private fun isTimedFlow(flowFiber: FlowFiber): Boolean { + return flowFiber.snapshot().checkpoint.subFlowStack.any { + TimedFlow::class.java.isAssignableFrom(it.flowClass) + } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 72027b962a..d7f4166829 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -102,6 +102,8 @@ interface StateMachineManagerInternal { fun removeSessionBindings(sessionIds: Set) fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) fun retryFlowFromSafePoint(currentState: StateMachineState) + fun scheduleFlowTimeout(flowId: StateMachineRunId) + fun cancelFlowTimeout(flowId: StateMachineRunId) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 5f1339a1f7..64337624a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -61,7 +61,6 @@ data class StateMachineState( * @param flowState the state of the flow itself, including the frozen fiber/FlowLogic. * @param errorState the "dirtiness" state including the involved errors and their propagation status. * @param numberOfSuspends the number of flow suspends due to IO API calls. - * @param deduplicationSeed the basis seed for the deduplication ID. This is used to produce replayable IDs. */ data class Checkpoint( val invocationContext: InvocationContext, @@ -70,8 +69,7 @@ data class Checkpoint( val subFlowStack: List, val flowState: FlowState, val errorState: ErrorState, - val numberOfSuspends: Int, - val deduplicationSeed: String + val numberOfSuspends: Int ) { companion object { @@ -92,8 +90,7 @@ data class Checkpoint( subFlowStack = listOf(topLevelSubFlow), flowState = FlowState.Unstarted(flowStart, frozenFlowLogic), errorState = ErrorState.Clean, - numberOfSuspends = 0, - deduplicationSeed = deduplicationSeed + numberOfSuspends = 0 ) } } @@ -105,13 +102,19 @@ data class Checkpoint( */ sealed class SessionState { + abstract val deduplicationSeed: String + /** * We haven't yet sent the initialisation message */ data class Uninitiated( val party: Party, - val initiatingSubFlow: SubFlow.Initiating - ) : SessionState() + val initiatingSubFlow: SubFlow.Initiating, + val sourceSessionId: SessionId, + val additionalEntropy: Long + ) : SessionState() { + override val deduplicationSeed: String get() = "R-${sourceSessionId.toLong}-$additionalEntropy" + } /** * We have sent the initialisation message but have not yet received a confirmation. @@ -119,7 +122,8 @@ sealed class SessionState { */ data class Initiating( val bufferedMessages: List>, - val rejectionError: FlowError? + val rejectionError: FlowError?, + override val deduplicationSeed: String ) : SessionState() /** @@ -131,7 +135,8 @@ sealed class SessionState { val peerFlowInfo: FlowInfo, val receivedMessages: List, val initiatedState: InitiatedSessionState, - val errors: List + val errors: List, + override val deduplicationSeed: String ) : SessionState() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt index 74e4f62c94..4ea9b50a11 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt @@ -11,7 +11,19 @@ package net.corda.node.services.statemachine.transitions import net.corda.core.flows.UnexpectedFlowEndException -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.ConfirmSessionMessage +import net.corda.node.services.statemachine.DataSessionMessage +import net.corda.node.services.statemachine.EndSessionMessage +import net.corda.node.services.statemachine.ErrorSessionMessage +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.ExistingSessionMessage +import net.corda.node.services.statemachine.FlowError +import net.corda.node.services.statemachine.InitiatedSessionState +import net.corda.node.services.statemachine.RejectSessionMessage +import net.corda.node.services.statemachine.SenderDeduplicationId +import net.corda.node.services.statemachine.SessionState +import net.corda.node.services.statemachine.StateMachineState /** * This transition handles incoming session messages. It handles the following cases: @@ -72,7 +84,8 @@ class DeliverSessionMessageTransition( peerFlowInfo = message.initiatedFlowInfo, receivedMessages = emptyList(), initiatedState = InitiatedSessionState.Live(message.initiatedSessionId), - errors = emptyList() + errors = emptyList(), + deduplicationSeed = sessionState.deduplicationSeed ) val newCheckpoint = currentState.checkpoint.copy( sessions = currentState.checkpoint.sessions + (event.sessionMessage.recipientSessionId to initiatedSession) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 3a9ee6f861..b22cd99df5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -16,7 +16,22 @@ import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.toNonEmptySet -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.DataSessionMessage +import net.corda.node.services.statemachine.DeduplicationId +import net.corda.node.services.statemachine.ExistingSessionMessage +import net.corda.node.services.statemachine.FlowError +import net.corda.node.services.statemachine.FlowSessionImpl +import net.corda.node.services.statemachine.FlowState +import net.corda.node.services.statemachine.InitialSessionMessage +import net.corda.node.services.statemachine.InitiatedSessionState +import net.corda.node.services.statemachine.SenderDeduplicationId +import net.corda.node.services.statemachine.SessionId +import net.corda.node.services.statemachine.SessionMap +import net.corda.node.services.statemachine.SessionState +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.SubFlow /** * This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request @@ -224,13 +239,15 @@ class StartedFlowTransition( if (sessionState !is SessionState.Uninitiated) { continue } - val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++) - val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null) - actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) - newSessions[sourceSessionId] = SessionState.Initiating( + val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null) + val newSessionState = SessionState.Initiating( bufferedMessages = emptyList(), - rejectionError = null + rejectionError = null, + deduplicationSeed = sessionState.deduplicationSeed ) + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, newSessionState) + actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) + newSessions[sourceSessionId] = newSessionState } currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) } @@ -259,14 +276,15 @@ class StartedFlowTransition( return freshErrorTransition(CannotFindSessionException(sourceSessionId)) } else { val sessionMessage = DataSessionMessage(message) - val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++) + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, existingSessionState) when (existingSessionState) { is SessionState.Uninitiated -> { - val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, message) + val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message) actions.add(Action.SendInitial(existingSessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) newSessions[sourceSessionId] = SessionState.Initiating( bufferedMessages = emptyList(), - rejectionError = null + rejectionError = null, + deduplicationSeed = existingSessionState.deduplicationSeed ) Unit } @@ -398,12 +416,13 @@ class StartedFlowTransition( private fun createInitialSessionMessage( initiatingSubFlow: SubFlow.Initiating, sourceSessionId: SessionId, + additionalEntropy: Long, payload: SerializedBytes? ): InitialSessionMessage { return InitialSessionMessage( initiatorSessionId = sourceSessionId, // We add additional entropy to add to the initiated side's deduplication seed. - initiationEntropy = context.secureRandom.nextLong(), + initiationEntropy = additionalEntropy, initiatorFlowClassName = initiatingSubFlow.classToInitiateWith.name, flowVersion = initiatingSubFlow.flowInfo.flowVersion, appName = initiatingSubFlow.flowInfo.appName, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 3a977c0ed5..04f9776625 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -12,6 +12,7 @@ package net.corda.node.services.statemachine.transitions import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.TimedFlow import net.corda.core.utilities.Try import net.corda.node.services.statemachine.* @@ -105,11 +106,20 @@ class TopLevelTransition( val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion) when (subFlow) { is Try.Success -> { + val containsTimedSubFlows = currentState.checkpoint.subFlowStack.any { + TimedFlow::class.java.isAssignableFrom(it.flowClass) + } + val isCurrentSubFlowTimed = TimedFlow::class.java.isAssignableFrom(event.subFlowClass) currentState = currentState.copy( checkpoint = currentState.checkpoint.copy( subFlowStack = currentState.checkpoint.subFlowStack + subFlow.value ) ) + // We don't schedule a timeout if there already is a timed subflow on the stack - a timeout had + // been scheduled already. + if (isCurrentSubFlowTimed && !containsTimedSubFlows) { + actions.add(Action.ScheduleFlowTimeout(currentState.flowLogic.runId)) + } } is Try.Failure -> { freshErrorTransition(subFlow.exception) @@ -125,16 +135,26 @@ class TopLevelTransition( if (checkpoint.subFlowStack.isEmpty()) { freshErrorTransition(UnexpectedEventInState()) } else { + val lastSubFlowClass = checkpoint.subFlowStack.last().flowClass + val isLastSubFlowTimed = TimedFlow::class.java.isAssignableFrom(lastSubFlowClass) + val newSubFlowStack = checkpoint.subFlowStack.dropLast(1) currentState = currentState.copy( checkpoint = checkpoint.copy( - subFlowStack = checkpoint.subFlowStack.subList(0, checkpoint.subFlowStack.size - 1).toList() + subFlowStack = newSubFlowStack ) ) + if (isLastSubFlowTimed && !containsTimedFlows(currentState.checkpoint.subFlowStack)) { + actions.add(Action.CancelFlowTimeout(currentState.flowLogic.runId)) + } } FlowContinuation.ProcessEvents } } + private fun containsTimedFlows(subFlowStack: List): Boolean { + return subFlowStack.any { TimedFlow::class.java.isAssignableFrom(it.flowClass) } + } + private fun suspendTransition(event: Event.Suspend): TransitionResult { return builder { val newCheckpoint = currentState.checkpoint.copy( @@ -212,7 +232,7 @@ class TopLevelTransition( val sendEndMessageActions = currentState.checkpoint.sessions.values.mapIndexed { index, state -> if (state is SessionState.Initiated && state.initiatedState is InitiatedSessionState.Live) { val message = ExistingSessionMessage(state.initiatedState.peerSinkSessionId, EndSessionMessage) - val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index) + val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state) Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID)) } else { null @@ -231,7 +251,7 @@ class TopLevelTransition( } val sourceSessionId = SessionId.createRandom(context.secureRandom) val sessionImpl = FlowSessionImpl(event.party, sourceSessionId) - val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow)) + val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) actions.add(Action.AddSessionBinding(context.id, sourceSessionId)) FlowContinuation.Resume(sessionImpl) @@ -262,4 +282,4 @@ class TopLevelTransition( FlowContinuation.Abort } } -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt index 9ceef31895..aeed37c370 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt @@ -11,7 +11,17 @@ package net.corda.node.services.statemachine.transitions import net.corda.core.flows.FlowInfo -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.ConfirmSessionMessage +import net.corda.node.services.statemachine.DataSessionMessage +import net.corda.node.services.statemachine.DeduplicationId +import net.corda.node.services.statemachine.ExistingSessionMessage +import net.corda.node.services.statemachine.FlowStart +import net.corda.node.services.statemachine.FlowState +import net.corda.node.services.statemachine.InitiatedSessionState +import net.corda.node.services.statemachine.SenderDeduplicationId +import net.corda.node.services.statemachine.SessionState +import net.corda.node.services.statemachine.StateMachineState /** * This transition is responsible for starting the flow from a FlowLogic instance. It creates the first checkpoint and @@ -55,7 +65,8 @@ class UnstartedFlowTransition( } else { listOf(DataSessionMessage(initiatingMessage.firstPayload)) }, - errors = emptyList() + errors = emptyList(), + deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}" ) val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo) val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage) @@ -68,7 +79,7 @@ class UnstartedFlowTransition( Action.SendExisting( flowStart.peerSession.counterparty, sessionMessage, - SenderDeduplicationId(DeduplicationId.createForNormal(currentState.checkpoint, 0), currentState.senderUUID) + SenderDeduplicationId(DeduplicationId.createForNormal(currentState.checkpoint, 0, initiatedState), currentState.senderUUID) ) ) } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 12b8d8af23..710e545540 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -70,9 +70,9 @@ class RetryFlowMockTest { val messagesSent = mutableListOf() val partyB = internalNodeB.info.legalIdentities.first() internalNodeA.setMessagingServiceSpy(object : MessagingServiceSpy(internalNodeA.network) { - override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { + override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { messagesSent.add(message) - messagingService.send(message, target, retryId) + messagingService.send(message, target) } }) internalNodeA.startFlow(SendAndRetryFlow(1, partyB)).get() diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index ed85b659d1..6966c23eac 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -14,8 +14,18 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.Command import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef -import net.corda.core.crypto.* -import net.corda.core.flows.* +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.TransactionSignature +import net.corda.core.crypto.generateKeyPair +import net.corda.core.crypto.sha256 +import net.corda.core.crypto.sign +import net.corda.core.flows.NotarisationPayload +import net.corda.core.flows.NotarisationRequest +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryException +import net.corda.core.flows.NotaryFlow import net.corda.core.identity.Party import net.corda.core.internal.notary.generateSignature import net.corda.core.messaging.MessageRecipients @@ -36,7 +46,12 @@ import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity import net.corda.testing.node.TestClock -import net.corda.testing.node.internal.* +import net.corda.testing.node.internal.InMemoryMessage +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.MessagingServiceSpy +import net.corda.testing.node.internal.setMessagingServiceSpy +import net.corda.testing.node.internal.startFlow import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before @@ -303,7 +318,7 @@ class ValidatingNotaryServiceTests { private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) { aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) { - override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { + override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { val messageData = message.data.deserialize() as? InitialSessionMessage val payload = messageData?.firstPayload!!.deserialize() @@ -311,10 +326,10 @@ class ValidatingNotaryServiceTests { val alteredPayload = payloadModifier(payload) val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize()) val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId) - messagingService.send(alteredMessage, target, retryId) + messagingService.send(alteredMessage, target) } else { - messagingService.send(message, target, retryId) + messagingService.send(message, target) } } }) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 615931eb29..dae1619c0b 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -430,7 +430,7 @@ class InMemoryMessagingNetwork private constructor( state.locked { check(handlers.remove(registration as Handler)) } } - override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) { + override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { check(running) msgSend(this, message, target) if (!sendManuallyPumped) { @@ -439,8 +439,8 @@ class InMemoryMessagingNetwork private constructor( } override fun send(addressedMessages: List) { - for ((message, target, retryId, sequenceKey) in addressedMessages) { - send(message, target, retryId, sequenceKey) + for ((message, target, sequenceKey) in addressedMessages) { + send(message, target, sequenceKey) } } @@ -453,8 +453,6 @@ class InMemoryMessagingNetwork private constructor( netNodeHasShutdown(peerHandle) } - override fun cancelRedelivery(retryId: Long) {} - /** Returns the given (topic & session, data) pair as a newly created message object. */ override fun createMessage(topic: String, data: ByteArray, deduplicationId: SenderDeduplicationId, additionalHeaders: Map): Message { return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId.deduplicationId, senderUUID = deduplicationId.senderUUID)