Merge remote-tracking branch 'open/master' into andrius/merge-06-07

This commit is contained in:
Andrius Dagys 2018-06-07 08:53:28 +01:00
commit d24299341c
26 changed files with 526 additions and 325 deletions

View File

@ -2225,7 +2225,7 @@ public final class net.corda.core.flows.NotaryFlow extends java.lang.Object
##
@DoNotImplement
@InitiatingFlow
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic implements net.corda.core.internal.TimedFlow
public <init>(net.corda.core.transactions.SignedTransaction)
public <init>(net.corda.core.transactions.SignedTransaction, net.corda.core.utilities.ProgressTracker)
@Suspendable

View File

@ -17,6 +17,7 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.TransactionSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.notary.generateSignature
import net.corda.core.internal.notary.validateSignatures
import net.corda.core.internal.pushToLoggingContext
@ -41,8 +42,10 @@ class NotaryFlow {
*/
@DoNotImplement
@InitiatingFlow
open class Client(private val stx: SignedTransaction,
override val progressTracker: ProgressTracker) : FlowLogic<List<TransactionSignature>>() {
open class Client(
private val stx: SignedTransaction,
override val progressTracker: ProgressTracker
) : FlowLogic<List<TransactionSignature>>(), TimedFlow {
constructor(stx: SignedTransaction) : this(stx, tracker())
companion object {

View File

@ -29,13 +29,9 @@ sealed class FlowIORequest<out R : Any> {
* @property shouldRetrySend specifies whether the send should be retried.
*/
data class Send(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>
) : FlowIORequest<Unit>() {
override fun toString() = "Send(" +
"sessionToMessage=${sessionToMessage.mapValues { it.value.hash }}, " +
"shouldRetrySend=$shouldRetrySend" +
")"
override fun toString() = "Send(sessionToMessage=${sessionToMessage.mapValues { it.value.hash }})"
}
/**

View File

@ -0,0 +1,23 @@
package net.corda.core.internal
/**
* A marker for a flow that will return the same result if replayed from the beginning. Any side effects the flow causes
* must also be idempotent.
*
* Flow idempotency allows skipping persisting checkpoints, allowing better performance.
*/
interface IdempotentFlow
/**
* An idempotent flow that needs to be replayed if it does not complete within a certain timeout.
*
* Example use would be the notary client flow: if the client sends a request to an HA notary cluster, it will get
* accepted by one of the cluster members, but the member might crash before returning a response. The client flow
* would be stuck waiting for that member to come back up. Retrying the notary flow will re-send the request to the
* next available notary cluster member.
*
* Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints
* persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow].
*/
// TODO: allow specifying retry settings per flow
interface TimedFlow : IdempotentFlow

View File

@ -0,0 +1,189 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.CordaService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.P2PMessagingRetryConfiguration
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.slf4j.MDC
import java.security.PublicKey
import java.util.concurrent.atomic.AtomicInteger
class TimedFlowTests {
companion object {
/** The notary nodes don't run any consensus protocol, so 2 nodes are sufficient for the purpose of this test. */
private const val CLUSTER_SIZE = 2
/** A shared counter across all notary service nodes. */
var requestsReceived: AtomicInteger = AtomicInteger(0)
private lateinit var mockNet: InternalMockNetwork
private lateinit var notary: Party
private lateinit var node: StartedNode<InternalMockNetwork.MockNode>
init {
LogHelper.setLevel("+net.corda.flow", "+net.corda.testing.node", "+net.corda.node.services.messaging")
}
@BeforeClass
@JvmStatic
fun setup() {
mockNet = InternalMockNetwork(
listOf("net.corda.testing.contracts", "net.corda.node.services"),
MockNetworkParameters().withServicePeerAllocationStrategy(InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()),
threadPerNode = true
)
val started = startClusterAndNode(mockNet)
notary = started.first
node = started.second
}
@AfterClass
@JvmStatic
fun stopNodes() {
mockNet.stopNodes()
}
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, StartedNode<InternalMockNetwork.MockNode>> {
val replicaIds = (0 until CLUSTER_SIZE)
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
CordaX500Name("Custom Notary", "Zurich", "CH"))
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
val notaryConfig = mock<NotaryConfig> {
whenever(it.custom).thenReturn(true)
whenever(it.isClusterConfig).thenReturn(true)
whenever(it.validating).thenReturn(true)
}
val notaryNodes = (0 until CLUSTER_SIZE).map {
mockNet.createUnstartedNode(InternalMockNodeParameters(configOverrides = {
doReturn(notaryConfig).whenever(it).notary
}))
}
val aliceNode = mockNet.createUnstartedNode(
InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = { conf: NodeConfiguration ->
val retryConfig = P2PMessagingRetryConfiguration(1.seconds, 3, 1.0)
doReturn(retryConfig).whenever(conf).p2pMessagingRetry
}
)
)
// MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the
// network-parameters in their directories before they're started.
val node = (notaryNodes + aliceNode).map { node ->
networkParameters.install(mockNet.baseDirectory(node.id))
node.start()
}.last()
return Pair(notaryIdentity, node)
}
}
@Before
fun resetCounter() {
requestsReceived = AtomicInteger(0)
}
@Test
fun `timed flows are restarted`() {
node.run {
val issueTx = signInitialTransaction(notary) {
setTimeWindow(services.clock.instant(), 30.seconds)
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
val flow = NotaryFlow.Client(issueTx)
val notarySignatures = services.startFlow(flow).resultFuture.get()
(issueTx + notarySignatures).verifyRequiredSignatures()
}
}
@Test
fun `timed sub-flows are restarted`() {
node.run {
val issueTx = signInitialTransaction(notary) {
setTimeWindow(services.clock.instant(), 30.seconds)
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
val flow = FinalityFlow(issueTx)
val stx = services.startFlow(flow).resultFuture.get()
stx.verifyRequiredSignatures()
}
}
private fun StartedNode<InternalMockNetwork.MockNode>.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
return services.signInitialTransaction(
TransactionBuilder(notary).apply {
addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey))
block()
}
)
}
@CordaService
private class TestNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val uniquenessProvider = mock<UniquenessProvider>()
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}
}
/** A notary flow that will yield without returning a response on the very first received request. */
private class TestNotaryFlow(otherSide: FlowSession, service: TestNotaryService) : NotaryServiceFlow(otherSide, service) {
@Suspendable
override fun validateRequest(requestPayload: NotarisationPayload): TransactionParts {
val myIdentity = serviceHub.myInfo.legalIdentities.first()
MDC.put("name", myIdentity.name.toString())
logger.info("Received a request from ${otherSideSession.counterparty.name}")
val stx = requestPayload.signedTransaction
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
if (TimedFlowTests.requestsReceived.getAndIncrement() == 0) {
logger.info("Ignoring")
// Waiting forever
stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
} else {
logger.info("Processing")
}
return TransactionParts(stx.id, stx.inputs, stx.tx.timeWindow, stx.notary)
}
}
}

View File

@ -12,9 +12,7 @@ package net.corda.services.messaging
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.randomOrNull
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
@ -33,7 +31,6 @@ import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.internalServices
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.NotarySpec
@ -41,10 +38,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.ClassRule
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
class P2PMessagingTest : IntegrationTest() {
private companion object {
@ -62,73 +56,6 @@ class P2PMessagingTest : IntegrationTest() {
}
}
@Test
fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() {
startDriverWithDistributedService { distributedServiceNodes ->
val alice = startAlice()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.internalServices.networkService.getAddressOfParty(getPartyInfo(notaryParty)!!)
}
val responseMessage = "response"
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, responseMessage)
// Send a single request with retry
val responseFuture = alice.receiveFrom(serviceAddress, retryId = 0)
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// The request wasn't successful.
assertThat(responseFuture.isDone).isFalse()
crashingNodes.ignoreRequests = false
// The retry should be successful.
val response = responseFuture.getOrThrow(10.seconds)
assertThat(response).isEqualTo(responseMessage)
}
}
@Test
fun `distributed service request retries are persisted across client node restarts`() {
startDriverWithDistributedService { distributedServiceNodes ->
val alice = startAlice()
val serviceAddress = alice.services.networkMapCache.run {
val notaryParty = notaryIdentities.randomOrNull()!!
alice.internalServices.networkService.getAddressOfParty(getPartyInfo(notaryParty)!!)
}
val responseMessage = "response"
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, responseMessage)
// Send a single request with retry
alice.receiveFrom(serviceAddress, retryId = 0)
// Wait until the first request is received
crashingNodes.firstRequestReceived.await()
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.stop()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
crashingNodes.ignoreRequests = false
// Restart the node and expect a response
val aliceRestarted = startAlice()
val responseFuture = openFuture<Any>()
aliceRestarted.internalServices.networkService.runOnNextMessage("test.response") {
responseFuture.set(it.data.deserialize())
}
val response = responseFuture.getOrThrow()
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
assertThat(response).isEqualTo(responseMessage)
}
}
private fun startDriverWithDistributedService(dsl: DriverDSL.(List<InProcess>) -> Unit) {
driver(DriverParameters(
startNodesInProcess = true,
@ -138,55 +65,6 @@ class P2PMessagingTest : IntegrationTest() {
}
}
private fun DriverDSL.startAlice(): InProcess {
return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf(
"messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3)))
.map { (it as InProcess) }
.getOrThrow()
}
data class CrashingNodes(
val firstRequestReceived: CountDownLatch,
val requestsReceived: AtomicInteger,
var ignoreRequests: Boolean
)
/**
* Sets up the [distributedServiceNodes] to respond to "test.request" requests. All nodes will receive requests and
* either ignore them or respond to "test.response", depending on the value of [CrashingNodes.ignoreRequests],
* initially set to true. This may be used to simulate scenarios where nodes receive request messages but crash
* before sending back a response.
*/
private fun simulateCrashingNodes(distributedServiceNodes: List<InProcess>, responseMessage: String): CrashingNodes {
val crashingNodes = CrashingNodes(
requestsReceived = AtomicInteger(0),
firstRequestReceived = CountDownLatch(1),
ignoreRequests = true
)
distributedServiceNodes.forEach {
val nodeName = it.services.myInfo.chooseIdentity().name
it.internalServices.networkService.addMessageHandler("test.request") { netMessage, _, handler ->
crashingNodes.requestsReceived.incrementAndGet()
crashingNodes.firstRequestReceived.countDown()
// The node which receives the first request will ignore all requests
print("$nodeName: Received request - ")
if (crashingNodes.ignoreRequests) {
println("ignoring")
// Requests are ignored to simulate a service node crashing before sending back a response.
// A retry by the client will result in the message being redelivered to another node in the service cluster.
} else {
println("sending response")
val request = netMessage.data.deserialize<TestRequest>()
val response = it.internalServices.networkService.createMessage("test.response", responseMessage.serialize().bytes)
it.internalServices.networkService.send(response, request.replyTo)
}
handler.afterDatabaseTransaction()
}
}
return crashingNodes
}
private fun assertAllNodesAreUsed(participatingServiceNodes: List<InProcess>, serviceName: CordaX500Name, originatingNode: InProcess) {
// Setup each node in the distributed service to return back it's NodeInfo so that we can know which node is being used
participatingServiceNodes.forEach { node ->
@ -217,12 +95,12 @@ class P2PMessagingTest : IntegrationTest() {
}
}
private fun InProcess.receiveFrom(target: MessageRecipients, retryId: Long? = null): CordaFuture<Any> {
private fun InProcess.receiveFrom(target: MessageRecipients): CordaFuture<Any> {
val response = openFuture<Any>()
internalServices.networkService.runOnNextMessage("test.response") { netMessage ->
response.set(netMessage.data.deserialize())
}
internalServices.networkService.send("test.request", TestRequest(replyTo = internalServices.networkService.myAddress), target, retryId = retryId)
internalServices.networkService.send("test.request", TestRequest(replyTo = internalServices.networkService.myAddress), target)
return response
}

View File

@ -74,8 +74,6 @@ interface MessagingService {
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
* to send an ACK message back.
*
* @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id.
* Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.
* @param sequenceKey an object that may be used to enable a parallel [MessagingService] implementation. Two
* subsequent send()s with the same [sequenceKey] (up to equality) are guaranteed to be delivered in the same
* sequence the send()s were called. By default this is chosen conservatively to be [target].
@ -84,7 +82,6 @@ interface MessagingService {
fun send(
message: Message,
target: MessageRecipients,
retryId: Long? = null,
sequenceKey: Any = target
)
@ -92,7 +89,6 @@ interface MessagingService {
data class AddressedMessage(
val message: Message,
val target: MessageRecipients,
val retryId: Long? = null,
val sequenceKey: Any = target
)
@ -105,9 +101,6 @@ interface MessagingService {
@Suspendable
fun send(addressedMessages: List<AddressedMessage>)
/** Cancels the scheduled message redelivery for the specified [retryId] */
fun cancelRedelivery(retryId: Long)
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*
@ -125,7 +118,7 @@ interface MessagingService {
val myAddress: SingleMessageRecipient
}
fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), retryId: Long? = null, additionalHeaders: Map<String, String> = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to, retryId)
fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), additionalHeaders: Map<String, String> = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to)
interface MessageHandlerRegistration

View File

@ -26,7 +26,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.internal.LifecycleSupport
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
@ -37,43 +41,41 @@ import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.requireMessageSize
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.io.Serializable
import java.security.PublicKey
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
/**
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
@ -119,35 +121,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
companion object {
private val log = contextLogger()
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = {
Pair(it.key,
Pair(it.message.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
it.recipients.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))
)
},
toPersistentEntity = { _key: Long, (_message: Message, _recipient: MessageRecipients): Pair<Message, MessageRecipients> ->
RetryMessage().apply {
key = _key
message = _message.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
recipients = _recipient.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
}
},
persistentEntityClass = RetryMessage::class.java
)
}
class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val additionalHeaders: Map<String, String>) : Message {
override val debugTimestamp: Instant = Instant.now()
override fun toString() = "$topic#${String(data.bytes)}"
}
}
private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount
private val backoffBase: Double = config.p2pMessagingRetry.backoffBase
private class InnerState {
var started = false
var running = false
@ -163,17 +142,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
}
private val messagesToRedeliver = createMessageToRedeliver()
private val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
/** A registration to handle messages of different types */
data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
override val ourSenderUUID = UUID.randomUUID().toString()
private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
@ -184,21 +158,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val deduplicator = P2PMessageDeduplicator(database)
internal var messagingExecutor: MessagingExecutor? = null
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry")
class RetryMessage(
@Id
@Column(name = "message_id", length = 64, nullable = false)
var key: Long = 0,
@Lob
@Column(nullable = false)
var message: ByteArray = EMPTY_BYTE_ARRAY,
@Lob
@Column(nullable = false)
var recipients: ByteArray = EMPTY_BYTE_ARRAY
) : Serializable
fun start() {
state.locked {
started = true
@ -255,8 +214,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
registerBridgeControl(bridgeSession!!, inboxes.toList())
enumerateBridges(bridgeSession!!, inboxes.toList())
}
resumeMessageRedelivery()
}
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
@ -355,12 +312,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
sendBridgeControl(startupMessage)
}
private fun resumeMessageRedelivery() {
messagesToRedeliver.forEach { retryId, (message, target) ->
send(message, target, retryId)
}
}
private val shutdownLatch = CountDownLatch(1)
/**
@ -534,53 +485,15 @@ class P2PMessagingClient(val config: NodeConfiguration,
override fun close() = stop()
@Suspendable
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
requireMessageSize(message.data.size, maxMessageSize)
messagingExecutor!!.send(message, target)
retryId?.let {
database.transaction {
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
}
scheduledMessageRedeliveries[it] = nodeExecutor.schedule({
sendWithRetry(0, message, target, retryId)
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
}
}
@Suspendable
override fun send(addressedMessages: List<MessagingService.AddressedMessage>) {
for ((message, target, retryId, sequenceKey) in addressedMessages) {
send(message, target, retryId, sequenceKey)
}
}
private fun sendWithRetry(retryCount: Int, message: Message, target: MessageRecipients, retryId: Long) {
log.trace { "Attempting to retry #$retryCount message delivery for $retryId" }
if (retryCount >= messageMaxRetryCount) {
log.warn("Reached the maximum number of retries ($messageMaxRetryCount) for message $message redelivery to $target")
scheduledMessageRedeliveries.remove(retryId)
return
}
val messageWithRetryCount = object : Message by message {
override val uniqueMessageId = DeduplicationId("${message.uniqueMessageId.toString}-$retryCount")
}
messagingExecutor!!.send(messageWithRetryCount, target)
scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({
sendWithRetry(retryCount + 1, message, target, retryId)
}, messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS)
}
override fun cancelRedelivery(retryId: Long) {
database.transaction {
messagesToRedeliver.remove(retryId)
}
scheduledMessageRedeliveries[retryId]?.let {
log.trace { "Cancelling message redelivery for retry id $retryId" }
if (!it.isDone) it.cancel(true)
scheduledMessageRedeliveries.remove(retryId)
for ((message, target, sequenceKey) in addressedMessages) {
send(message, target, sequenceKey)
}
}

View File

@ -13,20 +13,23 @@ package net.corda.node.services.schema
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.LinearState
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.SchemaService.SchemaOptions
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.P2PMessageDeduplicator
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.persistence.*
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.RunOnceService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
@ -52,7 +55,6 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
NodeSchedulerService.PersistentScheduledState::class.java,
NodeAttachmentService.DBAttachment::class.java,
P2PMessageDeduplicator.ProcessedMessage::class.java,
P2PMessagingClient.RetryMessage::class.java,
PersistentIdentityService.PersistentIdentity::class.java,
PersistentIdentityService.PersistentIdentityNames::class.java,
ContractUpgradeServiceImpl.DBContractUpgrade::class.java,

View File

@ -145,6 +145,19 @@ sealed class Action {
* Retry a flow from the last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details.
*/
data class RetryFlowFromSafePoint(val currentState: StateMachineState) : Action()
/**
* Schedule the flow [flowId] to be retried if it does not complete within the timeout period specified in the configuration.
*
* Note that this only works with [TimedFlow].
*/
data class ScheduleFlowTimeout(val flowId: StateMachineRunId) : Action()
/**
* Cancel the retry timeout for flow [flowId]. This must be called when a timed flow completes to prevent
* unnecessary additional invocations.
*/
data class CancelFlowTimeout(val flowId: StateMachineRunId) : Action()
}
/**

View File

@ -84,9 +84,10 @@ class ActionExecutorImpl(
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
is Action.ScheduleFlowTimeout -> scheduleFlowTimeout(action)
is Action.CancelFlowTimeout -> cancelFlowTimeout(action)
}
}
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
if (action.uuid != null) services.vaultService.softLockRelease(action.uuid)
}
@ -244,4 +245,12 @@ class ActionExecutorImpl(
private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes<Checkpoint> {
return checkpoint.serialize(context = checkpointSerializationContext)
}
private fun cancelFlowTimeout(action: Action.CancelFlowTimeout) {
stateMachineManager.cancelFlowTimeout(action.flowId)
}
private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) {
stateMachineManager.scheduleFlowTimeout(action.flowId)
}
}

View File

@ -28,16 +28,16 @@ data class DeduplicationId(val toString: String) {
* creating IDs in case the message-generating flow logic is replayed on hard failure.
*
* A normal deduplication ID consists of:
* 1. A deduplication seed set per flow. This is either the flow's ID or in case of an initated flow the
* initiator's session ID.
* 1. A deduplication seed set per session. This is the initiator's session ID, with a prefix for initiator
* or initiated.
* 2. The number of *clean* suspends since the start of the flow.
* 3. An optional additional index, for cases where several messages are sent as part of the state transition.
* Note that care must be taken with this index, it must be a deterministic counter. For example a naive
* iteration over a HashMap will produce a different list of indeces than a previous run, causing the
* message-id map to change, which means deduplication will not happen correctly.
*/
fun createForNormal(checkpoint: Checkpoint, index: Int): DeduplicationId {
return DeduplicationId("N-${checkpoint.deduplicationSeed}-${checkpoint.numberOfSuspends}-$index")
fun createForNormal(checkpoint: Checkpoint, index: Int, session: SessionState): DeduplicationId {
return DeduplicationId("N-${session.deduplicationSeed}-${checkpoint.numberOfSuspends}-$index")
}
/**

View File

@ -83,8 +83,7 @@ class FlowSessionImpl(
@Suspendable
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
val request = FlowIORequest.Send(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT))
)
return getFlowStateMachine().suspend(request, maySkipCheckpoint)
}

View File

@ -148,7 +148,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
val eventQueue = getTransientField(TransientValues::eventQueue)
try {
eventLoop@while (true) {
eventLoop@ while (true) {
val nextEvent = eventQueue.receive()
val continuation = processEvent(transitionExecutor, nextEvent)
when (continuation) {
@ -336,11 +336,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
parkAndSerialize { _, _ ->
logger.trace { "Suspended on $ioRequest" }
// Will skip checkpoint if there are any idempotent flows in the subflow stack.
val skipPersistingCheckpoint = containsIdempotentFlows() || maySkipCheckpoint
contextTransactionOrNull = transaction.value
val event = try {
Event.Suspend(
ioRequest = ioRequest,
maySkipCheckpoint = maySkipCheckpoint,
maySkipCheckpoint = skipPersistingCheckpoint,
fiber = this.serialize(context = serializationContext.value)
)
} catch (throwable: Throwable) {
@ -364,6 +367,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
))
}
private fun containsIdempotentFlows(): Boolean {
val subFlowStack = snapshot().checkpoint.subFlowStack
return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) }
}
@Suspendable
override fun scheduleEvent(event: Event) {
getTransientField(TransientValues::eventQueue).send(event)

View File

@ -0,0 +1,9 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaException
/**
* This exception is fired once the retry timeout of a [TimedFlow] expires.
* It will indicate to the flow hospital to restart the flow.
*/
data class FlowTimeoutException(val maxRetries: Int) : CordaException("replaying flow from the last checkpoint")

View File

@ -18,25 +18,17 @@ import co.paralleluniverse.strands.channels.Channels
import com.codahale.metrics.Gauge
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.*
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
@ -49,11 +41,6 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -65,11 +52,11 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.security.SecureRandom
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.concurrent.withLock
import kotlin.streams.toList
@ -93,18 +80,28 @@ class SingleThreadedStateMachineManager(
private class Flow(val fiber: FlowStateMachineImpl<*>, val resultFuture: OpenFuture<Any?>)
private data class ScheduledTimeout(
/** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */
val scheduledFuture: ScheduledFuture<*>,
/** Specifies the number of times this flow has been retried. */
val retryCount: Int = 0
)
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private class InnerState {
val changesPublisher = PublishSubject.create<StateMachineManager.Change>()!!
// True if we're shutting down, so don't resume anything.
/** True if we're shutting down, so don't resume anything. */
var stopping = false
val flows = HashMap<StateMachineRunId, Flow>()
val startedFutures = HashMap<StateMachineRunId, OpenFuture<Unit>>()
/** Flows scheduled to be retried if not finished within the specified timeout period. */
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
}
private val mutex = ThreadBox(InnerState())
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
// Monitoring support.
@ -219,8 +216,8 @@ class SingleThreadedStateMachineManager(
}
override fun killFlow(id: StateMachineRunId): Boolean {
return mutex.locked {
cancelTimeoutIfScheduled(id)
val flow = flows.remove(id)
if (flow != null) {
logger.debug("Killing flow known to physical node.")
@ -272,6 +269,7 @@ class SingleThreadedStateMachineManager(
override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) {
mutex.locked {
cancelTimeoutIfScheduled(flowId)
val flow = flows.remove(flowId)
if (flow != null) {
decrementLiveFibers()
@ -436,10 +434,11 @@ class SingleThreadedStateMachineManager(
"unknown session $recipientId, discarding..."
}
} else {
throw IllegalArgumentException("Cannot find flow corresponding to session ID $recipientId")
logger.warn("Cannot find flow corresponding to session ID $recipientId.")
}
} else {
val flow = mutex.locked { flows[flowId] } ?: throw IllegalStateException("Cannot find fiber corresponding to ID $flowId")
val flow = mutex.locked { flows[flowId] }
?: throw IllegalStateException("Cannot find fiber corresponding to ID $flowId")
flow.fiber.scheduleEvent(Event.DeliverSessionMessage(sessionMessage, deduplicationHandler, sender))
}
} catch (exception: Exception) {
@ -454,6 +453,7 @@ class SingleThreadedStateMachineManager(
val payload = RejectSessionMessage(message, errorId)
return ExistingSessionMessage(initiatorSessionId, payload)
}
val replyError = try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
@ -496,8 +496,8 @@ class SingleThreadedStateMachineManager(
} catch (e: ClassCastException) {
throw SessionRejectException("${message.initiatorFlowClassName} is not a flow")
}
return serviceHub.getFlowFactory(initiatingFlowClass) ?:
throw SessionRejectException("$initiatingFlowClass is not registered")
return serviceHub.getFlowFactory(initiatingFlowClass)
?: throw SessionRejectException("$initiatingFlowClass is not registered")
}
private fun <A> startInitiatedFlow(
@ -542,7 +542,7 @@ class SingleThreadedStateMachineManager(
flowLogic.stateMachine = flowStateMachineImpl
val frozenFlowLogic = (flowLogic as FlowLogic<*>).serialize(context = checkpointSerializationContext!!)
val flowCorDappVersion= createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, deduplicationSeed, flowCorDappVersion).getOrThrow()
val startedFuture = openFuture<Unit>()
@ -566,6 +566,59 @@ class SingleThreadedStateMachineManager(
return startedFuture.map { flowStateMachineImpl as FlowStateMachine<A> }
}
override fun scheduleFlowTimeout(flowId: StateMachineRunId) {
mutex.locked { scheduleTimeout(flowId) }
}
override fun cancelFlowTimeout(flowId: StateMachineRunId) {
mutex.locked { cancelTimeoutIfScheduled(flowId) }
}
/**
* Schedules the flow [flowId] to be retried if it does not finish within the timeout period
* specified in the config.
*
* Assumes lock is taken on the [InnerState].
*/
private fun InnerState.scheduleTimeout(flowId: StateMachineRunId) {
val flow = flows[flowId]
if (flow != null) {
val scheduledTimeout = timedFlows[flowId]
val retryCount = if (scheduledTimeout != null) {
val timeoutFuture = scheduledTimeout.scheduledFuture
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
scheduledTimeout.retryCount
} else 0
val scheduledFuture = scheduleTimeoutException(flow, retryCount)
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
} else {
logger.warn("Unable to schedule timeout for flow $flowId flow not found.")
}
}
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
return with(serviceHub.configuration.p2pMessagingRetry) {
val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
timeoutScheduler.schedule({
val event = Event.Error(FlowTimeoutException(maxRetryCount))
flow.fiber.scheduleEvent(event)
}, timeoutDelaySeconds, TimeUnit.SECONDS)
}
}
/**
* Cancels any scheduled flow timeout for [flowId].
*
* Assumes lock is taken on the [InnerState].
*/
private fun InnerState.cancelTimeoutIfScheduled(flowId: StateMachineRunId) {
timedFlows[flowId]?.let { (future, _) ->
if (!future.isDone) future.cancel(true)
timedFlows.remove(flowId)
}
}
private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes<Checkpoint>): Checkpoint? {
return try {
serializedCheckpoint.deserialize(context = checkpointSerializationContext!!)
@ -673,6 +726,8 @@ class SingleThreadedStateMachineManager(
} else {
oldFlow.resultFuture.captureLater(flow.resultFuture)
}
val flowLogic = flow.fiber.logic
if (flowLogic is TimedFlow) scheduleTimeout(id)
flow.fiber.scheduleEvent(Event.DoRemainingWork)
when (checkpoint.flowState) {
is FlowState.Unstarted -> {

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.TimedFlow
import net.corda.core.utilities.loggerFor
import java.sql.SQLException
import java.time.Instant
@ -12,7 +13,7 @@ import java.util.concurrent.ConcurrentHashMap
object StaffedFlowHospital : FlowHospital {
private val log = loggerFor<StaffedFlowHospital>()
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist)
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout)
private val patients = ConcurrentHashMap<StateMachineRunId, MedicalHistory>()
@ -124,4 +125,31 @@ object StaffedFlowHospital : FlowHospital {
return exception != null && (exception is org.hibernate.exception.ConstraintViolationException || mentionsConstraintViolation(exception.cause))
}
}
/**
* Restarts [TimedFlow], keeping track of the number of retries and making sure it does not
* exceed the limit specified by the [FlowTimeoutException].
*/
object DoctorTimeout : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
if (newError is FlowTimeoutException) {
if (isTimedFlow(flowFiber)) {
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
return Diagnosis.DISCHARGE
} else {
log.warn("\"Maximum number of retries reached for timed flow ${flowFiber.javaClass}")
}
} else {
log.warn("\"Unable to restart flow: ${flowFiber.javaClass}, it is not timed and does not contain any timed sub-flows.")
}
}
return Diagnosis.NOT_MY_SPECIALTY
}
private fun isTimedFlow(flowFiber: FlowFiber): Boolean {
return flowFiber.snapshot().checkpoint.subFlowStack.any {
TimedFlow::class.java.isAssignableFrom(it.flowClass)
}
}
}
}

View File

@ -102,6 +102,8 @@ interface StateMachineManagerInternal {
fun removeSessionBindings(sessionIds: Set<SessionId>)
fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState)
fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId)
}
/**

View File

@ -61,7 +61,6 @@ data class StateMachineState(
* @param flowState the state of the flow itself, including the frozen fiber/FlowLogic.
* @param errorState the "dirtiness" state including the involved errors and their propagation status.
* @param numberOfSuspends the number of flow suspends due to IO API calls.
* @param deduplicationSeed the basis seed for the deduplication ID. This is used to produce replayable IDs.
*/
data class Checkpoint(
val invocationContext: InvocationContext,
@ -70,8 +69,7 @@ data class Checkpoint(
val subFlowStack: List<SubFlow>,
val flowState: FlowState,
val errorState: ErrorState,
val numberOfSuspends: Int,
val deduplicationSeed: String
val numberOfSuspends: Int
) {
companion object {
@ -92,8 +90,7 @@ data class Checkpoint(
subFlowStack = listOf(topLevelSubFlow),
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean,
numberOfSuspends = 0,
deduplicationSeed = deduplicationSeed
numberOfSuspends = 0
)
}
}
@ -105,13 +102,19 @@ data class Checkpoint(
*/
sealed class SessionState {
abstract val deduplicationSeed: String
/**
* We haven't yet sent the initialisation message
*/
data class Uninitiated(
val party: Party,
val initiatingSubFlow: SubFlow.Initiating
) : SessionState()
val initiatingSubFlow: SubFlow.Initiating,
val sourceSessionId: SessionId,
val additionalEntropy: Long
) : SessionState() {
override val deduplicationSeed: String get() = "R-${sourceSessionId.toLong}-$additionalEntropy"
}
/**
* We have sent the initialisation message but have not yet received a confirmation.
@ -119,7 +122,8 @@ sealed class SessionState {
*/
data class Initiating(
val bufferedMessages: List<Pair<DeduplicationId, ExistingSessionMessagePayload>>,
val rejectionError: FlowError?
val rejectionError: FlowError?,
override val deduplicationSeed: String
) : SessionState()
/**
@ -131,7 +135,8 @@ sealed class SessionState {
val peerFlowInfo: FlowInfo,
val receivedMessages: List<DataSessionMessage>,
val initiatedState: InitiatedSessionState,
val errors: List<FlowError>
val errors: List<FlowError>,
override val deduplicationSeed: String
) : SessionState()
}

View File

@ -11,7 +11,19 @@
package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ConfirmSessionMessage
import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.EndSessionMessage
import net.corda.node.services.statemachine.ErrorSessionMessage
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.RejectSessionMessage
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
/**
* This transition handles incoming session messages. It handles the following cases:
@ -72,7 +84,8 @@ class DeliverSessionMessageTransition(
peerFlowInfo = message.initiatedFlowInfo,
receivedMessages = emptyList(),
initiatedState = InitiatedSessionState.Live(message.initiatedSessionId),
errors = emptyList()
errors = emptyList(),
deduplicationSeed = sessionState.deduplicationSeed
)
val newCheckpoint = currentState.checkpoint.copy(
sessions = currentState.checkpoint.sessions + (event.sessionMessage.recipientSessionId to initiatedSession)

View File

@ -16,7 +16,22 @@ import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.toNonEmptySet
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitialSessionMessage
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionMap
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.SubFlow
/**
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
@ -224,13 +239,15 @@ class StartedFlowTransition(
if (sessionState !is SessionState.Uninitiated) {
continue
}
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++)
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null)
actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = SessionState.Initiating(
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null)
val newSessionState = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null
rejectionError = null,
deduplicationSeed = sessionState.deduplicationSeed
)
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, newSessionState)
actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = newSessionState
}
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
}
@ -259,14 +276,15 @@ class StartedFlowTransition(
return freshErrorTransition(CannotFindSessionException(sourceSessionId))
} else {
val sessionMessage = DataSessionMessage(message)
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++)
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, existingSessionState)
when (existingSessionState) {
is SessionState.Uninitiated -> {
val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, message)
val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message)
actions.add(Action.SendInitial(existingSessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null
rejectionError = null,
deduplicationSeed = existingSessionState.deduplicationSeed
)
Unit
}
@ -398,12 +416,13 @@ class StartedFlowTransition(
private fun createInitialSessionMessage(
initiatingSubFlow: SubFlow.Initiating,
sourceSessionId: SessionId,
additionalEntropy: Long,
payload: SerializedBytes<Any>?
): InitialSessionMessage {
return InitialSessionMessage(
initiatorSessionId = sourceSessionId,
// We add additional entropy to add to the initiated side's deduplication seed.
initiationEntropy = context.secureRandom.nextLong(),
initiationEntropy = additionalEntropy,
initiatorFlowClassName = initiatingSubFlow.classToInitiateWith.name,
flowVersion = initiatingSubFlow.flowInfo.flowVersion,
appName = initiatingSubFlow.flowInfo.appName,

View File

@ -12,6 +12,7 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.TimedFlow
import net.corda.core.utilities.Try
import net.corda.node.services.statemachine.*
@ -105,11 +106,20 @@ class TopLevelTransition(
val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion)
when (subFlow) {
is Try.Success -> {
val containsTimedSubFlows = currentState.checkpoint.subFlowStack.any {
TimedFlow::class.java.isAssignableFrom(it.flowClass)
}
val isCurrentSubFlowTimed = TimedFlow::class.java.isAssignableFrom(event.subFlowClass)
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
subFlowStack = currentState.checkpoint.subFlowStack + subFlow.value
)
)
// We don't schedule a timeout if there already is a timed subflow on the stack - a timeout had
// been scheduled already.
if (isCurrentSubFlowTimed && !containsTimedSubFlows) {
actions.add(Action.ScheduleFlowTimeout(currentState.flowLogic.runId))
}
}
is Try.Failure -> {
freshErrorTransition(subFlow.exception)
@ -125,16 +135,26 @@ class TopLevelTransition(
if (checkpoint.subFlowStack.isEmpty()) {
freshErrorTransition(UnexpectedEventInState())
} else {
val lastSubFlowClass = checkpoint.subFlowStack.last().flowClass
val isLastSubFlowTimed = TimedFlow::class.java.isAssignableFrom(lastSubFlowClass)
val newSubFlowStack = checkpoint.subFlowStack.dropLast(1)
currentState = currentState.copy(
checkpoint = checkpoint.copy(
subFlowStack = checkpoint.subFlowStack.subList(0, checkpoint.subFlowStack.size - 1).toList()
subFlowStack = newSubFlowStack
)
)
if (isLastSubFlowTimed && !containsTimedFlows(currentState.checkpoint.subFlowStack)) {
actions.add(Action.CancelFlowTimeout(currentState.flowLogic.runId))
}
}
FlowContinuation.ProcessEvents
}
}
private fun containsTimedFlows(subFlowStack: List<SubFlow>): Boolean {
return subFlowStack.any { TimedFlow::class.java.isAssignableFrom(it.flowClass) }
}
private fun suspendTransition(event: Event.Suspend): TransitionResult {
return builder {
val newCheckpoint = currentState.checkpoint.copy(
@ -212,7 +232,7 @@ class TopLevelTransition(
val sendEndMessageActions = currentState.checkpoint.sessions.values.mapIndexed { index, state ->
if (state is SessionState.Initiated && state.initiatedState is InitiatedSessionState.Live) {
val message = ExistingSessionMessage(state.initiatedState.peerSinkSessionId, EndSessionMessage)
val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index)
val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state)
Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID))
} else {
null
@ -231,7 +251,7 @@ class TopLevelTransition(
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.party, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow))
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
FlowContinuation.Resume(sessionImpl)
@ -262,4 +282,4 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
}
}

View File

@ -11,7 +11,17 @@
package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.FlowInfo
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ConfirmSessionMessage
import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.FlowStart
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
/**
* This transition is responsible for starting the flow from a FlowLogic instance. It creates the first checkpoint and
@ -55,7 +65,8 @@ class UnstartedFlowTransition(
} else {
listOf(DataSessionMessage(initiatingMessage.firstPayload))
},
errors = emptyList()
errors = emptyList(),
deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}"
)
val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo)
val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage)
@ -68,7 +79,7 @@ class UnstartedFlowTransition(
Action.SendExisting(
flowStart.peerSession.counterparty,
sessionMessage,
SenderDeduplicationId(DeduplicationId.createForNormal(currentState.checkpoint, 0), currentState.senderUUID)
SenderDeduplicationId(DeduplicationId.createForNormal(currentState.checkpoint, 0, initiatedState), currentState.senderUUID)
)
)
}

View File

@ -70,9 +70,9 @@ class RetryFlowMockTest {
val messagesSent = mutableListOf<Message>()
val partyB = internalNodeB.info.legalIdentities.first()
internalNodeA.setMessagingServiceSpy(object : MessagingServiceSpy(internalNodeA.network) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message)
messagingService.send(message, target, retryId)
messagingService.send(message, target)
}
})
internalNodeA.startFlow(SendAndRetryFlow(1, partyB)).get()

View File

@ -14,8 +14,18 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.*
import net.corda.core.flows.*
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.sha256
import net.corda.core.crypto.sign
import net.corda.core.flows.NotarisationPayload
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.internal.notary.generateSignature
import net.corda.core.messaging.MessageRecipients
@ -36,7 +46,12 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.InMemoryMessage
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.setMessagingServiceSpy
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
@ -303,7 +318,7 @@ class ValidatingNotaryServiceTests {
private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) {
aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
val messageData = message.data.deserialize<Any>() as? InitialSessionMessage
val payload = messageData?.firstPayload!!.deserialize()
@ -311,10 +326,10 @@ class ValidatingNotaryServiceTests {
val alteredPayload = payloadModifier(payload)
val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize())
val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId)
messagingService.send(alteredMessage, target, retryId)
messagingService.send(alteredMessage, target)
} else {
messagingService.send(message, target, retryId)
messagingService.send(message, target)
}
}
})

View File

@ -430,7 +430,7 @@ class InMemoryMessagingNetwork private constructor(
state.locked { check(handlers.remove(registration as Handler)) }
}
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
check(running)
msgSend(this, message, target)
if (!sendManuallyPumped) {
@ -439,8 +439,8 @@ class InMemoryMessagingNetwork private constructor(
}
override fun send(addressedMessages: List<MessagingService.AddressedMessage>) {
for ((message, target, retryId, sequenceKey) in addressedMessages) {
send(message, target, retryId, sequenceKey)
for ((message, target, sequenceKey) in addressedMessages) {
send(message, target, sequenceKey)
}
}
@ -453,8 +453,6 @@ class InMemoryMessagingNetwork private constructor(
netNodeHasShutdown(peerHandle)
}
override fun cancelRedelivery(retryId: Long) {}
/** Returns the given (topic & session, data) pair as a newly created message object. */
override fun createMessage(topic: String, data: ByteArray, deduplicationId: SenderDeduplicationId, additionalHeaders: Map<String, String>): Message {
return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId.deduplicationId, senderUUID = deduplicationId.senderUUID)