From 4e556942167a92767c20d46c1111e825c606b00d Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Fri, 23 Nov 2018 17:45:36 +0000 Subject: [PATCH] ENT-2356 NotaryServiceFlow backpressure (#4242) --- .ci/api-current.txt | 2 +- .../kotlin/net/corda/core/flows/NotaryFlow.kt | 9 +- .../net/corda/core/flows/NotaryWireFormat.kt | 7 +- .../internal/BackpressureAwareTimedFlow.kt | 36 ++++ .../corda/core/internal/FlowStateMachine.kt | 2 + .../core/internal/notary/NotaryServiceFlow.kt | 38 +++- .../notary/SinglePartyNotaryService.kt | 15 +- .../internal/notary/UniquenessProvider.kt | 15 +- .../corda/notary/raft/RaftNotaryService.kt | 5 +- .../node/services/config/NodeConfiguration.kt | 7 + .../config/schema/v1/ConfigSections.kt | 4 +- .../statemachine/FlowStateMachineImpl.kt | 11 +- .../SingleThreadedStateMachineManager.kt | 67 +++++-- .../transactions/NonValidatingNotaryFlow.kt | 3 +- .../PersistentUniquenessProvider.kt | 70 +++++++- .../transactions/SimpleNotaryService.kt | 5 +- .../transactions/ValidatingNotaryFlow.kt | 3 +- .../net/corda/node/services/TimedFlowTests.kt | 163 ++++++++++++++---- .../node/internal/MockNodeMessagingService.kt | 3 +- 19 files changed, 390 insertions(+), 75 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/internal/BackpressureAwareTimedFlow.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 221cde3cae..e62c8cf108 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2271,7 +2271,7 @@ public final class net.corda.core.flows.NotaryFlow extends java.lang.Object ## @DoNotImplement @InitiatingFlow -public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic implements net.corda.core.internal.TimedFlow +public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.internal.BackpressureAwareTimedFlow public (net.corda.core.transactions.SignedTransaction) public (net.corda.core.transactions.SignedTransaction, net.corda.core.utilities.ProgressTracker) @Suspendable diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index ce28b9a73e..c900630fe4 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -7,8 +7,8 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.identity.Party +import net.corda.core.internal.BackpressureAwareTimedFlow import net.corda.core.internal.FetchDataFlow -import net.corda.core.internal.TimedFlow import net.corda.core.internal.notary.generateSignature import net.corda.core.internal.notary.validateSignatures import net.corda.core.internal.pushToLoggingContext @@ -37,7 +37,7 @@ class NotaryFlow { open class Client( private val stx: SignedTransaction, override val progressTracker: ProgressTracker - ) : FlowLogic>(), TimedFlow { + ) : BackpressureAwareTimedFlow>() { constructor(stx: SignedTransaction) : this(stx, tracker()) companion object { @@ -91,7 +91,7 @@ class NotaryFlow { private fun sendAndReceiveValidating(session: FlowSession, signature: NotarisationRequestSignature): UntrustworthyData { val payload = NotarisationPayload(stx, signature) subFlow(NotarySendTransactionFlow(session, payload)) - return session.receive() + return receiveResultOrTiming(session) } @Suspendable @@ -102,7 +102,8 @@ class NotaryFlow { is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is ReferenceStateRef || it is TimeWindow || it == notaryParty }) else -> ctx } - return session.sendAndReceiveWithRetry(NotarisationPayload(tx, signature)) + session.send(NotarisationPayload(tx, signature)) + return receiveResultOrTiming(session) } /** Checks that the notary's signature(s) is/are valid. */ diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryWireFormat.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryWireFormat.kt index 308c58aa96..6fda179ec7 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryWireFormat.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryWireFormat.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.TransactionSignature import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.CoreTransaction import net.corda.core.transactions.SignedTransaction +import java.time.Duration /** * A notarisation request specifies a list of states to consume and the id of the consuming transaction. Its primary @@ -80,4 +81,8 @@ data class NotarisationPayload(val transaction: Any, val requestSignature: Notar /** Payload returned by the notary service flow to the client. */ @CordaSerializable -data class NotarisationResponse(val signatures: List) \ No newline at end of file +data class NotarisationResponse(val signatures: List) + +/** Sent by the notary when the notary detects it will unlikely respond before the client retries. */ +@CordaSerializable +data class WaitTimeUpdate(val waitTime: Duration) diff --git a/core/src/main/kotlin/net/corda/core/internal/BackpressureAwareTimedFlow.kt b/core/src/main/kotlin/net/corda/core/internal/BackpressureAwareTimedFlow.kt new file mode 100644 index 0000000000..12f7dcebd1 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/BackpressureAwareTimedFlow.kt @@ -0,0 +1,36 @@ +package net.corda.core.internal + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.WaitTimeUpdate +import net.corda.core.utilities.UntrustworthyData + +const val MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE = 4 + +/** + * Implementation of TimedFlow that can handle WaitTimeUpdate messages. Any flow talking to the notary should implement this and use + * explicit send and this class's receiveResultOrTiming to receive the response to handle cases where the notary sends a timeout update. + * + * This is handling the special case of the notary where the notary service will have an internal queue on the uniqueness provider and we + * want to stop retries overwhelming that internal queue. As the TimedFlow mechanism and the notary service back-pressure are very specific + * to this use case at the moment, this implementation is internal and not for general use. + */ +abstract class BackpressureAwareTimedFlow : FlowLogic(), TimedFlow { + @Suspendable + inline fun receiveResultOrTiming(session: FlowSession): UntrustworthyData { + while (true) { + val wrappedResult = session.receive() + val unwrapped = wrappedResult.fromUntrustedWorld + when { + unwrapped is WaitTimeUpdate -> { + logger.info("Counterparty [${session.counterparty}] is busy - TimedFlow $runId has been asked to wait for an additional ${unwrapped.waitTime} seconds for completion.") + stateMachine.updateTimedFlowTimeout(unwrapped.waitTime.seconds) + } + unwrapped is ReceiveType -> @Suppress("UNCHECKED_CAST") // The compiler doesn't understand it's checked in the line above + return wrappedResult as UntrustworthyData + else -> throw throw IllegalArgumentException("We were expecting a ${ReceiveType::class.java.name} or WaitTimeUpdate but we instead got a ${unwrapped.javaClass.name} ($unwrapped)") + } + } + } +} diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index c2f0fa2ca3..d2c8222b0d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -36,6 +36,8 @@ interface FlowStateMachine { @Suspendable fun persistFlowStackSnapshot(flowClass: Class>) + fun updateTimedFlowTimeout(timeoutSeconds: Long) + val logic: FlowLogic val serviceHub: ServiceHub val logger: Logger diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index 701846fad1..7fdfe4a4ee 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -4,9 +4,22 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash -import net.corda.core.flows.* +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.NotarisationPayload +import net.corda.core.flows.NotarisationRequest +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotarisationResponse +import net.corda.core.flows.NotaryError +import net.corda.core.flows.NotaryException +import net.corda.core.flows.NotaryFlow +import net.corda.core.flows.WaitTimeUpdate import net.corda.core.identity.Party +import net.corda.core.internal.MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE +import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap +import java.time.Duration /** * A flow run by a notary service that handles notarisation requests. @@ -15,16 +28,30 @@ import net.corda.core.utilities.unwrap * if any of the input states have been previously committed. * * Additional transaction validation logic can be added when implementing [validateRequest]. + * + * @param otherSideSession The session with the notary client. + * @param service The notary service to utilise. + * @param etaThreshold If the ETA for processing the request, according to the service, is greater than this, notify the client. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? -abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic() { +abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic() { companion object { // TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder. private const val maxAllowedInputsAndReferences = 10_000 + + /** + * This is default wait time estimate for notaries/uniqueness providers that do not estimate wait times. + * Also used as default eta message threshold so that a default wait time/default threshold will never + * lead to an update message being sent. + */ + val defaultEstimatedWaitTime: Duration = 10.seconds } private var transactionId: SecureHash? = null + @Suspendable + private fun counterpartyCanHandleBackPressure() = otherSideSession.getCounterpartyFlowInfo(true).flowVersion >= MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE + @Suspendable override fun call(): Void? { check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) { @@ -39,6 +66,11 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: verifyTransaction(requestPayload) + val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size) + if (eta > etaThreshold && counterpartyCanHandleBackPressure()) { + otherSideSession.send(WaitTimeUpdate(eta)) + } + service.commitInputStates( tx.inputs, tx.id, @@ -135,4 +167,4 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: } /** Exception internal to the notary service. Does not get exposed to CorDapps and flows calling [NotaryFlow.Client]. */ -class NotaryInternalException(val error: NotaryError) : FlowException("Unable to notarise: $error") \ No newline at end of file +class NotaryInternalException(val error: NotaryError) : FlowException("Unable to notarise: $error") diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt index 6239f03824..e27aaea753 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/SinglePartyNotaryService.kt @@ -4,7 +4,11 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow -import net.corda.core.crypto.* +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignableData +import net.corda.core.crypto.SignatureMetadata +import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowLogic import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.identity.Party @@ -14,6 +18,7 @@ import net.corda.core.internal.notary.UniquenessProvider.Result import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.contextLogger import org.slf4j.Logger +import java.time.Duration /** Base implementation for a notary service operated by a singe party. */ abstract class SinglePartyNotaryService : NotaryService() { @@ -42,6 +47,7 @@ abstract class SinglePartyNotaryService : NotaryService() { val callingFlow = FlowLogic.currentTopLevel ?: throw IllegalStateException("This method should be invoked in a flow context.") + val result = callingFlow.executeAsync( CommitOperation( this, @@ -59,6 +65,13 @@ abstract class SinglePartyNotaryService : NotaryService() { } } + /** + * Estimate the wait time to be notarised taking into account the new request size. + * + * @param numStates The number of states we're about to request be notarised. + */ + fun getEstimatedWaitTime(numStates: Int): Duration = uniquenessProvider.getEta(numStates) + /** * Required for the flow to be able to suspend until the commit is complete. * This object will be included in the flow checkpoint. diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt index aebf618af1..fd2646b90e 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/UniquenessProvider.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.identity.Party +import java.time.Duration /** * A service that records input states of the given transaction and provides conflict information @@ -23,6 +24,18 @@ interface UniquenessProvider { references: List = emptyList() ): CordaFuture + /** + * Estimated time of request processing. A uniqueness provider that is aware of their own throughput can return + * an estimate how long requests will be queued before they can be processed. Notary services use this information + * to potentially update clients with an expected wait time in order to avoid spamming by retries when the notary + * gets busy. + * + * @param numStates The number of states (input + reference) in the new request, to be added to the pending count. + */ + fun getEta(numStates: Int): Duration { + return NotaryServiceFlow.defaultEstimatedWaitTime + } + /** The outcome of committing a transaction. */ sealed class Result { /** Indicates that all input states have been committed successfully. */ @@ -30,4 +43,4 @@ interface UniquenessProvider { /** Indicates that the transaction has not been committed. */ data class Failure(val error: NotaryError) : Result() } -} \ No newline at end of file +} diff --git a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt index 1b687b2ec3..023476d7aa 100644 --- a/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt +++ b/experimental/notary-raft/src/main/kotlin/net/corda/notary/raft/RaftNotaryService.kt @@ -3,6 +3,7 @@ package net.corda.notary.raft import net.corda.core.flows.FlowSession import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.utilities.seconds import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.node.services.transactions.ValidatingNotaryFlow @@ -36,8 +37,8 @@ class RaftNotaryService( override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { return if (notaryConfig.validating) { - ValidatingNotaryFlow(otherPartySession, this) - } else NonValidatingNotaryFlow(otherPartySession, this) + ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) + } else NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) } override fun start() { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 7a4293a85d..cd902d8cb5 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -6,6 +6,7 @@ import net.corda.common.validation.internal.Validated import net.corda.core.context.AuthServiceId import net.corda.core.identity.CordaX500Name import net.corda.core.internal.TimedFlow +import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.config.rpc.NodeRpcOptions import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec @@ -140,6 +141,12 @@ data class NotaryConfig( val serviceLegalName: CordaX500Name? = null, /** The name of the notary service class to load. */ val className: String = "net.corda.node.services.transactions.SimpleNotaryService", + /** + * If the wait time estimate on the internal queue exceeds this value, the notary may send + * a wait time update to the client (implementation specific and dependent on the counter + * party version). + */ + val etaMessageThresholdSeconds: Int = NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt(), /** Notary implementation-specific configuration parameters. */ val extraConfig: Config? = null ) diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt index 314f435b14..ce90d9560f 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt @@ -13,6 +13,7 @@ import net.corda.common.configuration.parsing.internal.nested import net.corda.common.validation.internal.Validated.Companion.invalid import net.corda.common.validation.internal.Validated.Companion.valid import net.corda.core.context.AuthServiceId +import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.node.services.config.AuthDataSourceType import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyType @@ -164,10 +165,11 @@ internal object NotaryConfigSpec : Configuration.Specification("No private val validating by boolean() private val serviceLegalName by string().mapValid(::toCordaX500Name).optional() private val className by string().optional().withDefaultValue("net.corda.node.services.transactions.SimpleNotaryService") + private val etaMessageThresholdSeconds by int().optional().withDefaultValue(NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt()) private val extraConfig by nestedObject().map(ConfigObject::toConfig).optional() override fun parseValid(configuration: Config): Valid { - return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[extraConfig])) + return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[etaMessageThresholdSeconds], configuration[extraConfig])) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index a27c0c09b5..d7d875b694 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -71,7 +71,8 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val stateMachine: StateMachine, val serviceHub: ServiceHubInternal, val checkpointSerializationContext: CheckpointSerializationContext, - val unfinishedFibers: ReusableLatch + val unfinishedFibers: ReusableLatch, + val waitTimeUpdateHook: (id: StateMachineRunId, timeout: Long) -> Unit ) internal var transientValues: TransientReference? = null @@ -411,6 +412,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, return transientState!!.value } + /** + * Hook to allow a timed flow to update its own timeout (i.e. how long it can be suspended before it gets + * retried. + */ + override fun updateTimedFlowTimeout(timeoutSeconds: Long) { + getTransientField(TransientValues::waitTimeUpdateHook).invoke(id, timeoutSeconds) + } + override val stateMachine get() = getTransientField(TransientValues::stateMachine) /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 1facf8d478..e82ce6bbfc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -13,7 +13,11 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.* +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.TimedFlow +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.castIfPossible import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture @@ -34,7 +38,11 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion -import net.corda.node.services.statemachine.interceptors.* +import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor +import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker +import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor +import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor +import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.injectOldProgressTracker @@ -47,11 +55,13 @@ import org.apache.logging.log4j.LogManager import rx.Observable import rx.subjects.PublishSubject import java.security.SecureRandom -import java.util.* -import java.util.concurrent.* +import java.util.HashSet +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.ThreadSafe -import kotlin.collections.ArrayList -import kotlin.collections.HashMap import kotlin.streams.toList /** @@ -579,22 +589,54 @@ class SingleThreadedStateMachineManager( if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) scheduledTimeout.retryCount } else 0 - val scheduledFuture = scheduleTimeoutException(flow, retryCount) + val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount)) timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1) } else { logger.warn("Unable to schedule timeout for flow $flowId – flow not found.") } } + private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) { + if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) { + logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." } + return + } + logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." } + mutex.locked { + resetCustomTimeout(flowId, timeoutSeconds) + } + } + + private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) { + val flow = flows[flowId] + if (flow != null) { + val scheduledTimeout = timedFlows[flowId] + val retryCount = if (scheduledTimeout != null) { + val timeoutFuture = scheduledTimeout.scheduledFuture + if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) + scheduledTimeout.retryCount + } else 0 + val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds) + timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount) + } else { + logger.warn("Unable to schedule timeout for flow $flowId – flow not found.") + } + } + /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ - private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { + private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> { return with(serviceHub.configuration.flowTimeout) { - val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() - val jitteredDelaySeconds = maxOf(1L, timeoutDelaySeconds/2 + (Math.random() * timeoutDelaySeconds/2).toLong()) timeoutScheduler.schedule({ val event = Event.Error(FlowTimeoutException(maxRestartCount)) flow.fiber.scheduleEvent(event) - }, jitteredDelaySeconds, TimeUnit.SECONDS) + }, delay, TimeUnit.SECONDS) + } + } + + private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long { + return with(serviceHub.configuration.flowTimeout) { + val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong()) } } @@ -642,7 +684,8 @@ class SingleThreadedStateMachineManager( stateMachine = StateMachine(id, secureRandom), serviceHub = serviceHub, checkpointSerializationContext = checkpointSerializationContext!!, - unfinishedFibers = unfinishedFibers + unfinishedFibers = unfinishedFibers, + waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) } ) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index fa05c15755..d95593af0a 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.transactions.ContractUpgradeFilteredTransaction import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.NotaryChangeWireTransaction +import java.time.Duration /** * The received transaction is not checked for contract-validity, as that would require fully @@ -17,7 +18,7 @@ import net.corda.core.transactions.NotaryChangeWireTransaction * the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently * undo the commit of the input states (the exact mechanism still needs to be worked out). */ -class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { +class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration) : NotaryServiceFlow(otherSideSession, service, etaThreshold) { override fun extractParts(requestPayload: NotarisationPayload): TransactionParts { val tx = requestPayload.coreTransaction return when (tx) { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index 4c71feb333..71b871109a 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -1,5 +1,6 @@ package net.corda.node.services.transactions +import com.codahale.metrics.SlidingWindowReservoir import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow @@ -12,8 +13,10 @@ import net.corda.core.identity.Party import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.internal.elapsedTime import net.corda.core.internal.notary.NotaryInternalException +import net.corda.core.internal.notary.NotaryServiceFlow +import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.isConsumedByTheSameTx import net.corda.core.internal.notary.validateTimeWindow import net.corda.core.schemas.PersistentStateRef @@ -27,11 +30,20 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import java.time.Clock +import java.time.Duration import java.time.Instant -import java.util.* +import java.util.LinkedHashMap import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.ThreadSafe -import javax.persistence.* +import javax.persistence.Column +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.GeneratedValue +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.MappedSuperclass import kotlin.concurrent.thread /** A RDBMS backed Uniqueness provider */ @@ -94,6 +106,34 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste private val commitLog = createMap(cacheFactory) private val requestQueue = LinkedBlockingQueue(requestQueueSize) + private val nrQueuedStates = AtomicInteger(0) + + /** + * Measured in states per minute, with a minimum of 1. We take an average of the last 100 commits. + * Minutes was chosen to increase accuracy by 60x over seconds, given we have to use longs here. + */ + private val throughputHistory = SlidingWindowReservoir(100) + @Volatile + var throughput: Double = 0.0 + + /** + * Estimated time of request processing. + * This uses performance metrics to gauge how long the wait time for a newly queued state will probably be. + * It checks that there is actual traffic going on (i.e. a non-zero number of states are queued and there + * is actual throughput) and then returns the expected wait time scaled up by a factor of 2 to give a probable + * upper bound. + * + * @param numStates The number of states (input + reference) we're about to request be notarised. + */ + override fun getEta(numStates: Int): Duration { + val rate = throughput + val nrStates = nrQueuedStates.getAndAdd(numStates) + log.debug { "rate: $rate, queueSize: $nrStates" } + if (rate > 0.0 && nrStates > 0) { + return Duration.ofSeconds((2 * TimeUnit.MINUTES.toSeconds(1) * nrStates / rate).toLong()) + } + return NotaryServiceFlow.defaultEstimatedWaitTime + } /** A request processor thread. */ private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) { @@ -252,9 +292,21 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste } } + private fun decrementQueueSize(request: CommitRequest): Int { + val nrStates = request.states.size + request.references.size + nrQueuedStates.addAndGet(-nrStates) + return nrStates + } + private fun processRequest(request: CommitRequest) { + val numStates = decrementQueueSize(request) try { - commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references) + val duration = elapsedTime { + commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references) + } + val statesPerMinute = numStates.toLong() * TimeUnit.MINUTES.toNanos(1) / duration.toNanos() + throughputHistory.update(maxOf(statesPerMinute, 1)) + throughput = throughputHistory.snapshot.median // Median deemed more stable / representative than mean. respondWithSuccess(request) } catch (e: Exception) { log.warn("Error processing commit request", e) @@ -263,11 +315,11 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste } private fun respondWithError(request: CommitRequest, exception: Exception) { - if (exception is NotaryInternalException) { - request.future.set(UniquenessProvider.Result.Failure(exception.error)) - } else { - request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) - } + if (exception is NotaryInternalException) { + request.future.set(UniquenessProvider.Result.Failure(exception.error)) + } else { + request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) + } } private fun respondWithSuccess(request: CommitRequest) { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index 713aa27747..3bbf32ef66 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -4,6 +4,7 @@ import net.corda.core.flows.FlowSession import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.core.schemas.MappedSchema +import net.corda.core.utilities.seconds import net.corda.node.services.api.ServiceHubInternal import java.security.PublicKey @@ -17,10 +18,10 @@ class SimpleNotaryService(override val services: ServiceHubInternal, override va override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow { return if (notaryConfig.validating) { log.info("Starting in validating mode") - ValidatingNotaryFlow(otherPartySession, this) + ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) } else { log.info("Starting in non-validating mode") - NonValidatingNotaryFlow(otherPartySession, this) + NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index 94019849db..7fd7f2278d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -12,6 +12,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction +import java.time.Duration /** * A notary commit flow that makes sure a given transaction is valid before committing it. This does mean that the calling @@ -19,7 +20,7 @@ import net.corda.core.transactions.WireTransaction * has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was * indeed valid. */ -open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) { +open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration = defaultEstimatedWaitTime) : NotaryServiceFlow(otherSideSession, service, etaThreshold) { override fun extractParts(requestPayload: NotarisationPayload): TransactionParts { val stx = requestPayload.signedTransaction val timeWindow: TimeWindow? = if (stx.coreTransaction is WireTransaction) stx.tx.timeWindow else null diff --git a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt index e5d63e7bdd..19edce0d3d 100644 --- a/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -6,22 +6,27 @@ import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash -import net.corda.core.flows.* +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest -import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.node.NotaryInfo import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.transactions.ValidatingNotaryFlow +import net.corda.node.services.transactions.NonValidatingNotaryFlow import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters @@ -29,17 +34,28 @@ import net.corda.testing.contracts.DummyContract import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity import net.corda.testing.internal.LogHelper -import net.corda.testing.node.* -import net.corda.testing.node.internal.* +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetFlowTimeOut +import net.corda.testing.node.MockNetNotaryConfig +import net.corda.testing.node.MockNetworkParameters +import net.corda.testing.node.MockNodeConfigOverrides +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.cordappsForPackages +import net.corda.testing.node.internal.startFlow import org.junit.AfterClass import org.junit.Before import org.junit.BeforeClass import org.junit.Test -import org.slf4j.MDC import java.security.PublicKey +import java.time.Duration import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import kotlin.test.assertNotEquals +import kotlin.test.assertTrue class TimedFlowTests { companion object { @@ -51,6 +67,10 @@ class TimedFlowTests { private lateinit var mockNet: InternalMockNetwork private lateinit var notary: Party private lateinit var node: TestStartedNode + private lateinit var patientNode: TestStartedNode + + private val waitEtaThreshold: Duration = NotaryServiceFlow.defaultEstimatedWaitTime + private var waitETA: Duration = waitEtaThreshold init { LogHelper.setLevel("+net.corda.flow", "+net.corda.testing.node", "+net.corda.node.services.messaging") @@ -67,6 +87,8 @@ class TimedFlowTests { val started = startClusterAndNode(mockNet) notary = started.first node = started.second + patientNode = started.third + } @AfterClass @@ -75,17 +97,17 @@ class TimedFlowTests { mockNet.stopNodes() } - private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair { + private fun startClusterAndNode(mockNet: InternalMockNetwork): Triple { val replicaIds = (0 until CLUSTER_SIZE) val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH") val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity( replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) }, serviceLegalName) - val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true)))) + val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false)))) val notaryConfig = MockNetNotaryConfig( serviceLegalName = serviceLegalName, - validating = true, + validating = false, className = TestNotaryService::class.java.name ) @@ -98,18 +120,26 @@ class TimedFlowTests { val aliceNode = mockNet.createUnstartedNode( InternalMockNodeParameters( legalName = CordaX500Name("Alice", "AliceCorp", "GB"), - configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(1.seconds, 3, 1.0)) + configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(2.seconds, 3, 1.0)) ) ) + val patientNode = mockNet.createUnstartedNode( + InternalMockNodeParameters( + legalName = CordaX500Name("Bob", "BobCorp", "GB"), + configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(10.seconds, 3, 1.0)) + ) + ) + + // MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the // network-parameters in their directories before they're started. - val node = (notaryNodes + aliceNode).map { node -> + val nodes = (notaryNodes + aliceNode + patientNode).map { node -> networkParameters.install(mockNet.baseDirectory(node.id)) node.start() - }.last() + } - return Pair(notaryIdentity, node) + return Triple(notaryIdentity, nodes[nodes.lastIndex - 1], nodes.last()) } } @@ -154,6 +184,70 @@ class TimedFlowTests { } } + @Test + fun `timed flow can update its ETA`() { + try { + waitETA = 10.minutes + node.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val flow = NotaryFlow.Client(issueTx) + val progressTracker = flow.progressTracker + assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep) + val progressTrackerDone = getDoneFuture(progressTracker) + + val resultFuture = services.startFlow(flow).resultFuture + var exceptionThrown = false + try { + resultFuture.get(3, TimeUnit.SECONDS) + } catch (e: TimeoutException) { + exceptionThrown = true + } + assertTrue(exceptionThrown) + flow.stateMachine.updateTimedFlowTimeout(2) + val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS) + (issueTx + notarySignatures).verifyRequiredSignatures() + progressTrackerDone.get() + } + } finally { + waitETA = waitEtaThreshold + } + } + + @Test + fun `timed flow cannot update its ETA to less than default`() { + try { + waitETA = 1.seconds + patientNode.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val flow = NotaryFlow.Client(issueTx) + val progressTracker = flow.progressTracker + assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep) + val progressTrackerDone = getDoneFuture(progressTracker) + + val resultFuture = services.startFlow(flow).resultFuture + flow.stateMachine.updateTimedFlowTimeout(1) + var exceptionThrown = false + try { + resultFuture.get(3, TimeUnit.SECONDS) + } catch (e: TimeoutException) { + exceptionThrown = true + } + assertTrue(exceptionThrown) + val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS) + (issueTx + notarySignatures).verifyRequiredSignatures() + progressTrackerDone.get() + } + } finally { + waitETA = waitEtaThreshold + } + } + private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { return services.signInitialTransaction( TransactionBuilder(notary).apply { @@ -170,6 +264,10 @@ class TimedFlowTests { }.bufferUntilSubscribed().toBlocking().toFuture() } + /** + * A test notary service that will just stop forever the first time you invoke its commitInputStates method and will succeed the + * second time around. + */ private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() { override val uniquenessProvider = object : UniquenessProvider { /** A dummy commit method that immediately returns a success message. */ @@ -178,30 +276,27 @@ class TimedFlowTests { set(UniquenessProvider.Result.Success) } } + + override fun getEta(numStates: Int): Duration = waitETA } - override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = TestNotaryFlow(otherPartySession, this) + @Suspendable + override fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) { + val callingFlow = FlowLogic.currentTopLevel + ?: throw IllegalStateException("This method should be invoked in a flow context.") + + if (requestsReceived.getAndIncrement() == 0) { + log.info("Ignoring") + // Waiting forever + callingFlow.stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false) + } else { + log.info("Processing") + super.commitInputStates(inputs, txId, caller, requestSignature, timeWindow, references) + } + } + + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = NonValidatingNotaryFlow(otherPartySession, this, waitEtaThreshold) override fun start() {} override fun stop() {} } - - /** A notary flow that will yield without returning a response on the very first received request. */ - private class TestNotaryFlow(otherSide: FlowSession, service: TestNotaryService) : ValidatingNotaryFlow(otherSide, service) { - @Suspendable - override fun verifyTransaction(requestPayload: NotarisationPayload) { - val myIdentity = serviceHub.myInfo.legalIdentities.first() - MDC.put("name", myIdentity.name.toString()) - logger.info("Received a request from ${otherSideSession.counterparty.name}") - val stx = requestPayload.signedTransaction - subFlow(ResolveTransactionsFlow(stx, otherSideSession)) - - if (requestsReceived.getAndIncrement() == 0) { - logger.info("Ignoring") - // Waiting forever - stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false) - } else { - logger.info("Processing") - } - } - } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt index a33b2823a0..89c60509ec 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt @@ -2,6 +2,7 @@ package net.corda.testing.node.internal import net.corda.core.identity.CordaX500Name import net.corda.core.identity.PartyAndCertificate +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.ThreadBox import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo @@ -243,7 +244,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration, return InMemoryReceivedMessage( message.topic, OpaqueBytes(message.data.bytes.copyOf()), // Kryo messes with the buffer so give each client a unique copy - 1, + PLATFORM_VERSION, message.uniqueMessageId, message.debugTimestamp, sender.name