ENT-2356 NotaryServiceFlow backpressure (#4242)

This commit is contained in:
Thomas Schroeter 2018-11-23 17:45:36 +00:00 committed by Rick Parker
parent 2c182dd158
commit 4e55694216
19 changed files with 390 additions and 75 deletions

View File

@ -2271,7 +2271,7 @@ public final class net.corda.core.flows.NotaryFlow extends java.lang.Object
##
@DoNotImplement
@InitiatingFlow
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.flows.FlowLogic implements net.corda.core.internal.TimedFlow
public static class net.corda.core.flows.NotaryFlow$Client extends net.corda.core.internal.BackpressureAwareTimedFlow
public <init>(net.corda.core.transactions.SignedTransaction)
public <init>(net.corda.core.transactions.SignedTransaction, net.corda.core.utilities.ProgressTracker)
@Suspendable

View File

@ -7,8 +7,8 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.identity.Party
import net.corda.core.internal.BackpressureAwareTimedFlow
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.notary.generateSignature
import net.corda.core.internal.notary.validateSignatures
import net.corda.core.internal.pushToLoggingContext
@ -37,7 +37,7 @@ class NotaryFlow {
open class Client(
private val stx: SignedTransaction,
override val progressTracker: ProgressTracker
) : FlowLogic<List<TransactionSignature>>(), TimedFlow {
) : BackpressureAwareTimedFlow<List<TransactionSignature>>() {
constructor(stx: SignedTransaction) : this(stx, tracker())
companion object {
@ -91,7 +91,7 @@ class NotaryFlow {
private fun sendAndReceiveValidating(session: FlowSession, signature: NotarisationRequestSignature): UntrustworthyData<NotarisationResponse> {
val payload = NotarisationPayload(stx, signature)
subFlow(NotarySendTransactionFlow(session, payload))
return session.receive()
return receiveResultOrTiming(session)
}
@Suspendable
@ -102,7 +102,8 @@ class NotaryFlow {
is WireTransaction -> ctx.buildFilteredTransaction(Predicate { it is StateRef || it is ReferenceStateRef || it is TimeWindow || it == notaryParty })
else -> ctx
}
return session.sendAndReceiveWithRetry(NotarisationPayload(tx, signature))
session.send(NotarisationPayload(tx, signature))
return receiveResultOrTiming(session)
}
/** Checks that the notary's signature(s) is/are valid. */

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.SignedTransaction
import java.time.Duration
/**
* A notarisation request specifies a list of states to consume and the id of the consuming transaction. Its primary
@ -80,4 +81,8 @@ data class NotarisationPayload(val transaction: Any, val requestSignature: Notar
/** Payload returned by the notary service flow to the client. */
@CordaSerializable
data class NotarisationResponse(val signatures: List<TransactionSignature>)
data class NotarisationResponse(val signatures: List<TransactionSignature>)
/** Sent by the notary when the notary detects it will unlikely respond before the client retries. */
@CordaSerializable
data class WaitTimeUpdate(val waitTime: Duration)

View File

@ -0,0 +1,36 @@
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.WaitTimeUpdate
import net.corda.core.utilities.UntrustworthyData
const val MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE = 4
/**
* Implementation of TimedFlow that can handle WaitTimeUpdate messages. Any flow talking to the notary should implement this and use
* explicit send and this class's receiveResultOrTiming to receive the response to handle cases where the notary sends a timeout update.
*
* This is handling the special case of the notary where the notary service will have an internal queue on the uniqueness provider and we
* want to stop retries overwhelming that internal queue. As the TimedFlow mechanism and the notary service back-pressure are very specific
* to this use case at the moment, this implementation is internal and not for general use.
*/
abstract class BackpressureAwareTimedFlow<ResultType> : FlowLogic<ResultType>(), TimedFlow {
@Suspendable
inline fun <reified ReceiveType> receiveResultOrTiming(session: FlowSession): UntrustworthyData<ReceiveType> {
while (true) {
val wrappedResult = session.receive<Any>()
val unwrapped = wrappedResult.fromUntrustedWorld
when {
unwrapped is WaitTimeUpdate -> {
logger.info("Counterparty [${session.counterparty}] is busy - TimedFlow $runId has been asked to wait for an additional ${unwrapped.waitTime} seconds for completion.")
stateMachine.updateTimedFlowTimeout(unwrapped.waitTime.seconds)
}
unwrapped is ReceiveType -> @Suppress("UNCHECKED_CAST") // The compiler doesn't understand it's checked in the line above
return wrappedResult as UntrustworthyData<ReceiveType>
else -> throw throw IllegalArgumentException("We were expecting a ${ReceiveType::class.java.name} or WaitTimeUpdate but we instead got a ${unwrapped.javaClass.name} ($unwrapped)")
}
}
}
}

View File

@ -36,6 +36,8 @@ interface FlowStateMachine<FLOWRETURN> {
@Suspendable
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>)
fun updateTimedFlowTimeout(timeoutSeconds: Long)
val logic: FlowLogic<FLOWRETURN>
val serviceHub: ServiceHub
val logger: Logger

View File

@ -4,9 +4,22 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotarisationPayload
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotarisationResponse
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.WaitTimeUpdate
import net.corda.core.identity.Party
import net.corda.core.internal.MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import java.time.Duration
/**
* A flow run by a notary service that handles notarisation requests.
@ -15,16 +28,30 @@ import net.corda.core.utilities.unwrap
* if any of the input states have been previously committed.
*
* Additional transaction validation logic can be added when implementing [validateRequest].
*
* @param otherSideSession The session with the notary client.
* @param service The notary service to utilise.
* @param etaThreshold If the ETA for processing the request, according to the service, is greater than this, notify the client.
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService) : FlowLogic<Void?>() {
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic<Void?>() {
companion object {
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
private const val maxAllowedInputsAndReferences = 10_000
/**
* This is default wait time estimate for notaries/uniqueness providers that do not estimate wait times.
* Also used as default eta message threshold so that a default wait time/default threshold will never
* lead to an update message being sent.
*/
val defaultEstimatedWaitTime: Duration = 10.seconds
}
private var transactionId: SecureHash? = null
@Suspendable
private fun counterpartyCanHandleBackPressure() = otherSideSession.getCounterpartyFlowInfo(true).flowVersion >= MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
@Suspendable
override fun call(): Void? {
check(serviceHub.myInfo.legalIdentities.any { serviceHub.networkMapCache.isNotary(it) }) {
@ -39,6 +66,11 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
verifyTransaction(requestPayload)
val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
if (eta > etaThreshold && counterpartyCanHandleBackPressure()) {
otherSideSession.send(WaitTimeUpdate(eta))
}
service.commitInputStates(
tx.inputs,
tx.id,
@ -135,4 +167,4 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
}
/** Exception internal to the notary service. Does not get exposed to CorDapps and flows calling [NotaryFlow.Client]. */
class NotaryInternalException(val error: NotaryError) : FlowException("Unable to notarise: $error")
class NotaryInternalException(val error: NotaryError) : FlowException("Unable to notarise: $error")

View File

@ -4,7 +4,11 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
@ -14,6 +18,7 @@ import net.corda.core.internal.notary.UniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
import java.time.Duration
/** Base implementation for a notary service operated by a singe party. */
abstract class SinglePartyNotaryService : NotaryService() {
@ -42,6 +47,7 @@ abstract class SinglePartyNotaryService : NotaryService() {
val callingFlow = FlowLogic.currentTopLevel
?: throw IllegalStateException("This method should be invoked in a flow context.")
val result = callingFlow.executeAsync(
CommitOperation(
this,
@ -59,6 +65,13 @@ abstract class SinglePartyNotaryService : NotaryService() {
}
}
/**
* Estimate the wait time to be notarised taking into account the new request size.
*
* @param numStates The number of states we're about to request be notarised.
*/
fun getEstimatedWaitTime(numStates: Int): Duration = uniquenessProvider.getEta(numStates)
/**
* Required for the flow to be able to suspend until the commit is complete.
* This object will be included in the flow checkpoint.

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
import java.time.Duration
/**
* A service that records input states of the given transaction and provides conflict information
@ -23,6 +24,18 @@ interface UniquenessProvider {
references: List<StateRef> = emptyList()
): CordaFuture<Result>
/**
* Estimated time of request processing. A uniqueness provider that is aware of their own throughput can return
* an estimate how long requests will be queued before they can be processed. Notary services use this information
* to potentially update clients with an expected wait time in order to avoid spamming by retries when the notary
* gets busy.
*
* @param numStates The number of states (input + reference) in the new request, to be added to the pending count.
*/
fun getEta(numStates: Int): Duration {
return NotaryServiceFlow.defaultEstimatedWaitTime
}
/** The outcome of committing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
@ -30,4 +43,4 @@ interface UniquenessProvider {
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()
}
}
}

View File

@ -3,6 +3,7 @@ package net.corda.notary.raft
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.node.services.transactions.ValidatingNotaryFlow
@ -36,8 +37,8 @@ class RaftNotaryService(
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
return if (notaryConfig.validating) {
ValidatingNotaryFlow(otherPartySession, this)
} else NonValidatingNotaryFlow(otherPartySession, this)
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
} else NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
}
override fun start() {

View File

@ -6,6 +6,7 @@ import net.corda.common.validation.internal.Validated
import net.corda.core.context.AuthServiceId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.rpc.NodeRpcOptions
import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec
@ -140,6 +141,12 @@ data class NotaryConfig(
val serviceLegalName: CordaX500Name? = null,
/** The name of the notary service class to load. */
val className: String = "net.corda.node.services.transactions.SimpleNotaryService",
/**
* If the wait time estimate on the internal queue exceeds this value, the notary may send
* a wait time update to the client (implementation specific and dependent on the counter
* party version).
*/
val etaMessageThresholdSeconds: Int = NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt(),
/** Notary implementation-specific configuration parameters. */
val extraConfig: Config? = null
)

View File

@ -13,6 +13,7 @@ import net.corda.common.configuration.parsing.internal.nested
import net.corda.common.validation.internal.Validated.Companion.invalid
import net.corda.common.validation.internal.Validated.Companion.valid
import net.corda.core.context.AuthServiceId
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.services.config.AuthDataSourceType
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.CertChainPolicyType
@ -164,10 +165,11 @@ internal object NotaryConfigSpec : Configuration.Specification<NotaryConfig>("No
private val validating by boolean()
private val serviceLegalName by string().mapValid(::toCordaX500Name).optional()
private val className by string().optional().withDefaultValue("net.corda.node.services.transactions.SimpleNotaryService")
private val etaMessageThresholdSeconds by int().optional().withDefaultValue(NotaryServiceFlow.defaultEstimatedWaitTime.seconds.toInt())
private val extraConfig by nestedObject().map(ConfigObject::toConfig).optional()
override fun parseValid(configuration: Config): Valid<NotaryConfig> {
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[extraConfig]))
return valid(NotaryConfig(configuration[validating], configuration[serviceLegalName], configuration[className], configuration[etaMessageThresholdSeconds], configuration[extraConfig]))
}
}

View File

@ -71,7 +71,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val stateMachine: StateMachine,
val serviceHub: ServiceHubInternal,
val checkpointSerializationContext: CheckpointSerializationContext,
val unfinishedFibers: ReusableLatch
val unfinishedFibers: ReusableLatch,
val waitTimeUpdateHook: (id: StateMachineRunId, timeout: Long) -> Unit
)
internal var transientValues: TransientReference<TransientValues>? = null
@ -411,6 +412,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
return transientState!!.value
}
/**
* Hook to allow a timed flow to update its own timeout (i.e. how long it can be suspended before it gets
* retried.
*/
override fun updateTimedFlowTimeout(timeoutSeconds: Long) {
getTransientField(TransientValues::waitTimeUpdateHook).invoke(id, timeoutSeconds)
}
override val stateMachine get() = getTransientField(TransientValues::stateMachine)
/**

View File

@ -13,7 +13,11 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
@ -34,7 +38,11 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.injectOldProgressTracker
@ -47,11 +55,13 @@ import org.apache.logging.log4j.LogManager
import rx.Observable
import rx.subjects.PublishSubject
import java.security.SecureRandom
import java.util.*
import java.util.concurrent.*
import java.util.HashSet
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.streams.toList
/**
@ -579,22 +589,54 @@ class SingleThreadedStateMachineManager(
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
scheduledTimeout.retryCount
} else 0
val scheduledFuture = scheduleTimeoutException(flow, retryCount)
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
} else {
logger.warn("Unable to schedule timeout for flow $flowId flow not found.")
}
}
private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
return
}
logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
mutex.locked {
resetCustomTimeout(flowId, timeoutSeconds)
}
}
private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
val flow = flows[flowId]
if (flow != null) {
val scheduledTimeout = timedFlows[flowId]
val retryCount = if (scheduledTimeout != null) {
val timeoutFuture = scheduledTimeout.scheduledFuture
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
scheduledTimeout.retryCount
} else 0
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount)
} else {
logger.warn("Unable to schedule timeout for flow $flowId flow not found.")
}
}
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
return with(serviceHub.configuration.flowTimeout) {
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
val jitteredDelaySeconds = maxOf(1L, timeoutDelaySeconds/2 + (Math.random() * timeoutDelaySeconds/2).toLong())
timeoutScheduler.schedule({
val event = Event.Error(FlowTimeoutException(maxRestartCount))
flow.fiber.scheduleEvent(event)
}, jitteredDelaySeconds, TimeUnit.SECONDS)
}, delay, TimeUnit.SECONDS)
}
}
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
return with(serviceHub.configuration.flowTimeout) {
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
}
}
@ -642,7 +684,8 @@ class SingleThreadedStateMachineManager(
stateMachine = StateMachine(id, secureRandom),
serviceHub = serviceHub,
checkpointSerializationContext = checkpointSerializationContext!!,
unfinishedFibers = unfinishedFibers
unfinishedFibers = unfinishedFibers,
waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) }
)
}

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.transactions.ContractUpgradeFilteredTransaction
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
import java.time.Duration
/**
* The received transaction is not checked for contract-validity, as that would require fully
@ -17,7 +18,7 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
* the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently
* undo the commit of the input states (the exact mechanism still needs to be worked out).
*/
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration) : NotaryServiceFlow(otherSideSession, service, etaThreshold) {
override fun extractParts(requestPayload: NotarisationPayload): TransactionParts {
val tx = requestPayload.coreTransaction
return when (tx) {

View File

@ -1,5 +1,6 @@
package net.corda.node.services.transactions
import com.codahale.metrics.SlidingWindowReservoir
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
@ -12,8 +13,10 @@ import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.elapsedTime
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.schemas.PersistentStateRef
@ -27,11 +30,20 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.LinkedHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import javax.persistence.GeneratedValue
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.MappedSuperclass
import kotlin.concurrent.thread
/** A RDBMS backed Uniqueness provider */
@ -94,6 +106,34 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
private val commitLog = createMap(cacheFactory)
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
private val nrQueuedStates = AtomicInteger(0)
/**
* Measured in states per minute, with a minimum of 1. We take an average of the last 100 commits.
* Minutes was chosen to increase accuracy by 60x over seconds, given we have to use longs here.
*/
private val throughputHistory = SlidingWindowReservoir(100)
@Volatile
var throughput: Double = 0.0
/**
* Estimated time of request processing.
* This uses performance metrics to gauge how long the wait time for a newly queued state will probably be.
* It checks that there is actual traffic going on (i.e. a non-zero number of states are queued and there
* is actual throughput) and then returns the expected wait time scaled up by a factor of 2 to give a probable
* upper bound.
*
* @param numStates The number of states (input + reference) we're about to request be notarised.
*/
override fun getEta(numStates: Int): Duration {
val rate = throughput
val nrStates = nrQueuedStates.getAndAdd(numStates)
log.debug { "rate: $rate, queueSize: $nrStates" }
if (rate > 0.0 && nrStates > 0) {
return Duration.ofSeconds((2 * TimeUnit.MINUTES.toSeconds(1) * nrStates / rate).toLong())
}
return NotaryServiceFlow.defaultEstimatedWaitTime
}
/** A request processor thread. */
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
@ -252,9 +292,21 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
}
}
private fun decrementQueueSize(request: CommitRequest): Int {
val nrStates = request.states.size + request.references.size
nrQueuedStates.addAndGet(-nrStates)
return nrStates
}
private fun processRequest(request: CommitRequest) {
val numStates = decrementQueueSize(request)
try {
commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
val duration = elapsedTime {
commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
}
val statesPerMinute = numStates.toLong() * TimeUnit.MINUTES.toNanos(1) / duration.toNanos()
throughputHistory.update(maxOf(statesPerMinute, 1))
throughput = throughputHistory.snapshot.median // Median deemed more stable / representative than mean.
respondWithSuccess(request)
} catch (e: Exception) {
log.warn("Error processing commit request", e)
@ -263,11 +315,11 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
}
private fun respondWithError(request: CommitRequest, exception: Exception) {
if (exception is NotaryInternalException) {
request.future.set(UniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
if (exception is NotaryInternalException) {
request.future.set(UniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
}
private fun respondWithSuccess(request: CommitRequest) {

View File

@ -4,6 +4,7 @@ import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.seconds
import net.corda.node.services.api.ServiceHubInternal
import java.security.PublicKey
@ -17,10 +18,10 @@ class SimpleNotaryService(override val services: ServiceHubInternal, override va
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
return if (notaryConfig.validating) {
log.info("Starting in validating mode")
ValidatingNotaryFlow(otherPartySession, this)
ValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
} else {
log.info("Starting in non-validating mode")
NonValidatingNotaryFlow(otherPartySession, this)
NonValidatingNotaryFlow(otherPartySession, this, notaryConfig.etaMessageThresholdSeconds.seconds)
}
}

View File

@ -12,6 +12,7 @@ import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionWithSignatures
import net.corda.core.transactions.WireTransaction
import java.time.Duration
/**
* A notary commit flow that makes sure a given transaction is valid before committing it. This does mean that the calling
@ -19,7 +20,7 @@ import net.corda.core.transactions.WireTransaction
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
* indeed valid.
*/
open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService) : NotaryServiceFlow(otherSideSession, service) {
open class ValidatingNotaryFlow(otherSideSession: FlowSession, service: SinglePartyNotaryService, etaThreshold: Duration = defaultEstimatedWaitTime) : NotaryServiceFlow(otherSideSession, service, etaThreshold) {
override fun extractParts(requestPayload: NotarisationPayload): TransactionParts {
val stx = requestPayload.signedTransaction
val timeWindow: TimeWindow? = if (stx.coreTransaction is WireTransaction) stx.tx.timeWindow else null

View File

@ -6,22 +6,27 @@ import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.ValidatingNotaryFlow
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
@ -29,17 +34,28 @@ import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.*
import net.corda.testing.node.internal.*
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetFlowTimeOut
import net.corda.testing.node.MockNetNotaryConfig
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.MockNodeConfigOverrides
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.startFlow
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.slf4j.MDC
import java.security.PublicKey
import java.time.Duration
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertNotEquals
import kotlin.test.assertTrue
class TimedFlowTests {
companion object {
@ -51,6 +67,10 @@ class TimedFlowTests {
private lateinit var mockNet: InternalMockNetwork
private lateinit var notary: Party
private lateinit var node: TestStartedNode
private lateinit var patientNode: TestStartedNode
private val waitEtaThreshold: Duration = NotaryServiceFlow.defaultEstimatedWaitTime
private var waitETA: Duration = waitEtaThreshold
init {
LogHelper.setLevel("+net.corda.flow", "+net.corda.testing.node", "+net.corda.node.services.messaging")
@ -67,6 +87,8 @@ class TimedFlowTests {
val started = startClusterAndNode(mockNet)
notary = started.first
node = started.second
patientNode = started.third
}
@AfterClass
@ -75,17 +97,17 @@ class TimedFlowTests {
mockNet.stopNodes()
}
private fun startClusterAndNode(mockNet: InternalMockNetwork): Pair<Party, TestStartedNode> {
private fun startClusterAndNode(mockNet: InternalMockNetwork): Triple<Party, TestStartedNode, TestStartedNode> {
val replicaIds = (0 until CLUSTER_SIZE)
val serviceLegalName = CordaX500Name("Custom Notary", "Zurich", "CH")
val notaryIdentity = DevIdentityGenerator.generateDistributedNotaryCompositeIdentity(
replicaIds.map { mockNet.baseDirectory(mockNet.nextNodeId + it) },
serviceLegalName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, true))))
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryIdentity, false))))
val notaryConfig = MockNetNotaryConfig(
serviceLegalName = serviceLegalName,
validating = true,
validating = false,
className = TestNotaryService::class.java.name
)
@ -98,18 +120,26 @@ class TimedFlowTests {
val aliceNode = mockNet.createUnstartedNode(
InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(1.seconds, 3, 1.0))
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(2.seconds, 3, 1.0))
)
)
val patientNode = mockNet.createUnstartedNode(
InternalMockNodeParameters(
legalName = CordaX500Name("Bob", "BobCorp", "GB"),
configOverrides = MockNodeConfigOverrides(flowTimeout = MockNetFlowTimeOut(10.seconds, 3, 1.0))
)
)
// MockNetwork doesn't support notary clusters, so we create all the nodes we need unstarted, and then install the
// network-parameters in their directories before they're started.
val node = (notaryNodes + aliceNode).map { node ->
val nodes = (notaryNodes + aliceNode + patientNode).map { node ->
networkParameters.install(mockNet.baseDirectory(node.id))
node.start()
}.last()
}
return Pair(notaryIdentity, node)
return Triple(notaryIdentity, nodes[nodes.lastIndex - 1], nodes.last())
}
}
@ -154,6 +184,70 @@ class TimedFlowTests {
}
}
@Test
fun `timed flow can update its ETA`() {
try {
waitETA = 10.minutes
node.run {
val issueTx = signInitialTransaction(notary) {
setTimeWindow(services.clock.instant(), 30.seconds)
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
val flow = NotaryFlow.Client(issueTx)
val progressTracker = flow.progressTracker
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
val progressTrackerDone = getDoneFuture(progressTracker)
val resultFuture = services.startFlow(flow).resultFuture
var exceptionThrown = false
try {
resultFuture.get(3, TimeUnit.SECONDS)
} catch (e: TimeoutException) {
exceptionThrown = true
}
assertTrue(exceptionThrown)
flow.stateMachine.updateTimedFlowTimeout(2)
val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS)
(issueTx + notarySignatures).verifyRequiredSignatures()
progressTrackerDone.get()
}
} finally {
waitETA = waitEtaThreshold
}
}
@Test
fun `timed flow cannot update its ETA to less than default`() {
try {
waitETA = 1.seconds
patientNode.run {
val issueTx = signInitialTransaction(notary) {
setTimeWindow(services.clock.instant(), 30.seconds)
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
val flow = NotaryFlow.Client(issueTx)
val progressTracker = flow.progressTracker
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
val progressTrackerDone = getDoneFuture(progressTracker)
val resultFuture = services.startFlow(flow).resultFuture
flow.stateMachine.updateTimedFlowTimeout(1)
var exceptionThrown = false
try {
resultFuture.get(3, TimeUnit.SECONDS)
} catch (e: TimeoutException) {
exceptionThrown = true
}
assertTrue(exceptionThrown)
val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS)
(issueTx + notarySignatures).verifyRequiredSignatures()
progressTrackerDone.get()
}
} finally {
waitETA = waitEtaThreshold
}
}
private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
return services.signInitialTransaction(
TransactionBuilder(notary).apply {
@ -170,6 +264,10 @@ class TimedFlowTests {
}.bufferUntilSubscribed().toBlocking().toFuture()
}
/**
* A test notary service that will just stop forever the first time you invoke its commitInputStates method and will succeed the
* second time around.
*/
private class TestNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
override val uniquenessProvider = object : UniquenessProvider {
/** A dummy commit method that immediately returns a success message. */
@ -178,30 +276,27 @@ class TimedFlowTests {
set(UniquenessProvider.Result.Success)
}
}
override fun getEta(numStates: Int): Duration = waitETA
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = TestNotaryFlow(otherPartySession, this)
@Suspendable
override fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
val callingFlow = FlowLogic.currentTopLevel
?: throw IllegalStateException("This method should be invoked in a flow context.")
if (requestsReceived.getAndIncrement() == 0) {
log.info("Ignoring")
// Waiting forever
callingFlow.stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
} else {
log.info("Processing")
super.commitInputStates(inputs, txId, caller, requestSignature, timeWindow, references)
}
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this, waitEtaThreshold)
override fun start() {}
override fun stop() {}
}
/** A notary flow that will yield without returning a response on the very first received request. */
private class TestNotaryFlow(otherSide: FlowSession, service: TestNotaryService) : ValidatingNotaryFlow(otherSide, service) {
@Suspendable
override fun verifyTransaction(requestPayload: NotarisationPayload) {
val myIdentity = serviceHub.myInfo.legalIdentities.first()
MDC.put("name", myIdentity.name.toString())
logger.info("Received a request from ${otherSideSession.counterparty.name}")
val stx = requestPayload.signedTransaction
subFlow(ResolveTransactionsFlow(stx, otherSideSession))
if (requestsReceived.getAndIncrement() == 0) {
logger.info("Ignoring")
// Waiting forever
stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
} else {
logger.info("Processing")
}
}
}
}

View File

@ -2,6 +2,7 @@ package net.corda.testing.node.internal
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
@ -243,7 +244,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
return InMemoryReceivedMessage(
message.topic,
OpaqueBytes(message.data.bytes.copyOf()), // Kryo messes with the buffer so give each client a unique copy
1,
PLATFORM_VERSION,
message.uniqueMessageId,
message.debugTimestamp,
sender.name