mirror of
https://github.com/corda/corda.git
synced 2025-01-31 08:25:50 +00:00
Merge pull request #1021 from corda/merges/june-19-16-47
This commit is contained in:
commit
e46395d7d8
@ -15,7 +15,7 @@ class NotaryException(
|
|||||||
val error: NotaryError,
|
val error: NotaryError,
|
||||||
/** Id of the transaction to be notarised. Can be _null_ if an error occurred before the id could be resolved. */
|
/** Id of the transaction to be notarised. Can be _null_ if an error occurred before the id could be resolved. */
|
||||||
val txId: SecureHash? = null
|
val txId: SecureHash? = null
|
||||||
) : FlowException("Unable to notarise transaction${txId ?: " "}: $error")
|
) : FlowException("Unable to notarise transaction ${txId ?: "<Unknown>"} : $error")
|
||||||
|
|
||||||
/** Specifies the cause for notarisation request failure. */
|
/** Specifies the cause for notarisation request failure. */
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
@ -27,7 +27,9 @@ sealed class NotaryError {
|
|||||||
/** Specifies which states have already been consumed in another transaction. */
|
/** Specifies which states have already been consumed in another transaction. */
|
||||||
val consumedStates: Map<StateRef, StateConsumptionDetails>
|
val consumedStates: Map<StateRef, StateConsumptionDetails>
|
||||||
) : NotaryError() {
|
) : NotaryError() {
|
||||||
override fun toString() = "One or more input states have been used in another transaction"
|
override fun toString() = "Conflict notarising transaction $txId. " +
|
||||||
|
"Input states have been used in another transactions, count: ${consumedStates.size}, " +
|
||||||
|
"content: ${consumedStates.asSequence().joinToString(limit = 5) { it.key.toString() + "->" + it.value }}"
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */
|
/** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */
|
||||||
|
@ -95,4 +95,11 @@ sealed class FlowIORequest<out R : Any> {
|
|||||||
* Execute the specified [operation], suspend the flow until completion.
|
* Execute the specified [operation], suspend the flow until completion.
|
||||||
*/
|
*/
|
||||||
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
|
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates that no actual IO request occurred, and the flow should be resumed immediately.
|
||||||
|
* This is used for performing explicit checkpointing anywhere in a flow.
|
||||||
|
*/
|
||||||
|
// TODO: consider using an empty FlowAsyncOperation instead
|
||||||
|
object ForceCheckpoint : FlowIORequest<Unit>()
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import net.corda.core.cordapp.Cordapp
|
|||||||
import net.corda.core.cordapp.CordappConfig
|
import net.corda.core.cordapp.CordappConfig
|
||||||
import net.corda.core.cordapp.CordappContext
|
import net.corda.core.cordapp.CordappContext
|
||||||
import net.corda.core.crypto.*
|
import net.corda.core.crypto.*
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.node.ServicesForResolution
|
import net.corda.core.node.ServicesForResolution
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
@ -525,6 +526,11 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade
|
|||||||
|
|
||||||
val PublicKey.hash: SecureHash get() = encoded.sha256()
|
val PublicKey.hash: SecureHash get() = encoded.sha256()
|
||||||
|
|
||||||
|
/** Checks if this flow is an idempotent flow. */
|
||||||
|
fun Class<out FlowLogic<*>>.isIdempotentFlow(): Boolean {
|
||||||
|
return IdempotentFlow::class.java.isAssignableFrom(this)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extension method for providing a sumBy method that processes and returns a Long
|
* Extension method for providing a sumBy method that processes and returns a Long
|
||||||
*/
|
*/
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
|
|
||||||
package net.corda.core.messaging
|
package net.corda.core.messaging
|
||||||
|
|
||||||
|
import net.corda.core.CordaInternal
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.context.InvocationContext
|
import net.corda.core.context.InvocationContext
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
@ -209,6 +210,7 @@ interface CordaRPCOps : RPCOps {
|
|||||||
*
|
*
|
||||||
* TODO This method should be removed once SGX work is finalised and the design of the corresponding API using [FilteredTransaction] can be started
|
* TODO This method should be removed once SGX work is finalised and the design of the corresponding API using [FilteredTransaction] can be started
|
||||||
*/
|
*/
|
||||||
|
@CordaInternal
|
||||||
@Deprecated("This method is intended only for internal use and will be removed from the public API soon.")
|
@Deprecated("This method is intended only for internal use and will be removed from the public API soon.")
|
||||||
fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?
|
fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ import net.corda.testing.core.SerializationEnvironmentRule
|
|||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class NotaryExceptionSerializationTest {
|
class NotaryExceptionSerializationTest {
|
||||||
@Rule
|
@Rule
|
||||||
@ -27,5 +28,6 @@ class NotaryExceptionSerializationTest {
|
|||||||
val instanceOnTheOtherSide = instance.serialize().bytes.deserialize<NotaryException>()
|
val instanceOnTheOtherSide = instance.serialize().bytes.deserialize<NotaryException>()
|
||||||
|
|
||||||
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
assertEquals(instance.error, instanceOnTheOtherSide.error)
|
||||||
|
assertTrue(instance.error.toString().contains("->"))
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -8,6 +8,9 @@ Unreleased
|
|||||||
==========
|
==========
|
||||||
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
|
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
|
||||||
|
|
||||||
|
* Introduced a grace period before the initial node registration fails if the node cannot connect to the Doorman.
|
||||||
|
It retries 10 times with a 1 minute interval in between each try. At the moment this is not configurable.
|
||||||
|
|
||||||
* Added a ``FlowMonitor`` to log information about flows that have been waiting for IO more than a configurable threshold.
|
* Added a ``FlowMonitor`` to log information about flows that have been waiting for IO more than a configurable threshold.
|
||||||
|
|
||||||
* H2 database changes:
|
* H2 database changes:
|
||||||
|
@ -62,6 +62,7 @@ open class CashPaymentFlow(
|
|||||||
val anonymousRecipient = txIdentities[recipient] ?: recipient
|
val anonymousRecipient = txIdentities[recipient] ?: recipient
|
||||||
progressTracker.currentStep = GENERATING_TX
|
progressTracker.currentStep = GENERATING_TX
|
||||||
val builder = TransactionBuilder(notary = null)
|
val builder = TransactionBuilder(notary = null)
|
||||||
|
logger.info("Generating spend for: ${builder.lockId}")
|
||||||
// TODO: Have some way of restricting this to states the caller controls
|
// TODO: Have some way of restricting this to states the caller controls
|
||||||
val (spendTX, keysForSigning) = try {
|
val (spendTX, keysForSigning) = try {
|
||||||
Cash.generateSpend(serviceHub,
|
Cash.generateSpend(serviceHub,
|
||||||
@ -75,10 +76,13 @@ open class CashPaymentFlow(
|
|||||||
}
|
}
|
||||||
|
|
||||||
progressTracker.currentStep = SIGNING_TX
|
progressTracker.currentStep = SIGNING_TX
|
||||||
|
logger.info("Signing transaction for: ${spendTX.lockId}")
|
||||||
val tx = serviceHub.signInitialTransaction(spendTX, keysForSigning)
|
val tx = serviceHub.signInitialTransaction(spendTX, keysForSigning)
|
||||||
|
|
||||||
progressTracker.currentStep = FINALISING_TX
|
progressTracker.currentStep = FINALISING_TX
|
||||||
|
logger.info("Finalising transaction for: ${tx.id}")
|
||||||
val notarised = finaliseTx(tx, setOf(recipient), "Unable to notarise spend")
|
val notarised = finaliseTx(tx, setOf(recipient), "Unable to notarise spend")
|
||||||
|
logger.info("Finalised transaction for: ${notarised.id}")
|
||||||
return Result(notarised, anonymousRecipient)
|
return Result(notarised, anonymousRecipient)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ import net.corda.node.services.config.shouldStartSSHDaemon
|
|||||||
import net.corda.node.services.transactions.bftSMaRtSerialFilter
|
import net.corda.node.services.transactions.bftSMaRtSerialFilter
|
||||||
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
|
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
|
||||||
import net.corda.node.utilities.registration.NodeRegistrationHelper
|
import net.corda.node.utilities.registration.NodeRegistrationHelper
|
||||||
|
import net.corda.node.utilities.registration.UnableToRegisterNodeWithDoormanException
|
||||||
import net.corda.nodeapi.internal.addShutdownHook
|
import net.corda.nodeapi.internal.addShutdownHook
|
||||||
import net.corda.nodeapi.internal.config.UnknownConfigurationKeysException
|
import net.corda.nodeapi.internal.config.UnknownConfigurationKeysException
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseMigrationException
|
import net.corda.nodeapi.internal.persistence.DatabaseMigrationException
|
||||||
@ -134,6 +135,9 @@ open class NodeStartup(val args: Array<String>) {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
logStartupInfo(versionInfo, cmdlineOptions, conf)
|
logStartupInfo(versionInfo, cmdlineOptions, conf)
|
||||||
|
} catch (e: UnableToRegisterNodeWithDoormanException) {
|
||||||
|
logger.warn("Node registration service is unavailable. Perhaps try to perform the initial registration again after a while.")
|
||||||
|
return false
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error("Exception during node registration", e)
|
logger.error("Exception during node registration", e)
|
||||||
return false
|
return false
|
||||||
|
@ -264,6 +264,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
||||||
|
checkpointIfSubflowIdempotent(subFlow.javaClass)
|
||||||
processEventImmediately(
|
processEventImmediately(
|
||||||
Event.EnterSubFlow(subFlow.javaClass,
|
Event.EnterSubFlow(subFlow.javaClass,
|
||||||
createSubFlowVersion(
|
createSubFlowVersion(
|
||||||
@ -284,6 +285,21 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the sub-flow is [IdempotentFlow] we need to perform a checkpoint to make sure any potentially side-effect
|
||||||
|
* generating logic between the last checkpoint and the sub-flow invocation does not get replayed if the
|
||||||
|
* flow restarts.
|
||||||
|
*
|
||||||
|
* We don't checkpoint if the current flow is [IdempotentFlow] as well.
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
private fun checkpointIfSubflowIdempotent(subFlow: Class<FlowLogic<*>>) {
|
||||||
|
val currentFlow = snapshot().checkpoint.subFlowStack.last().flowClass
|
||||||
|
if (!currentFlow.isIdempotentFlow() && subFlow.isIdempotentFlow()) {
|
||||||
|
suspend(FlowIORequest.ForceCheckpoint, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun initiateFlow(party: Party): FlowSession {
|
override fun initiateFlow(party: Party): FlowSession {
|
||||||
val resume = processEventImmediately(
|
val resume = processEventImmediately(
|
||||||
|
@ -206,8 +206,8 @@ class StaffedFlowHospital {
|
|||||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
||||||
return Diagnosis.DISCHARGE
|
return Diagnosis.DISCHARGE
|
||||||
} else {
|
} else {
|
||||||
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}." +
|
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
||||||
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and" +
|
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and " +
|
||||||
"unable to service requests fast enough. Please try again later."
|
"unable to service requests fast enough. Please try again later."
|
||||||
newError.setMessage(errorMsg)
|
newError.setMessage(errorMsg)
|
||||||
log.warn(errorMsg)
|
log.warn(errorMsg)
|
||||||
|
@ -16,22 +16,7 @@ import net.corda.core.flows.UnexpectedFlowEndException
|
|||||||
import net.corda.core.internal.FlowIORequest
|
import net.corda.core.internal.FlowIORequest
|
||||||
import net.corda.core.serialization.SerializedBytes
|
import net.corda.core.serialization.SerializedBytes
|
||||||
import net.corda.core.utilities.toNonEmptySet
|
import net.corda.core.utilities.toNonEmptySet
|
||||||
import net.corda.node.services.statemachine.Action
|
import net.corda.node.services.statemachine.*
|
||||||
import net.corda.node.services.statemachine.Checkpoint
|
|
||||||
import net.corda.node.services.statemachine.DataSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.DeduplicationId
|
|
||||||
import net.corda.node.services.statemachine.ExistingSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.FlowError
|
|
||||||
import net.corda.node.services.statemachine.FlowSessionImpl
|
|
||||||
import net.corda.node.services.statemachine.FlowState
|
|
||||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
|
||||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
|
||||||
import net.corda.node.services.statemachine.SessionId
|
|
||||||
import net.corda.node.services.statemachine.SessionMap
|
|
||||||
import net.corda.node.services.statemachine.SessionState
|
|
||||||
import net.corda.node.services.statemachine.StateMachineState
|
|
||||||
import net.corda.node.services.statemachine.SubFlow
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
||||||
@ -65,6 +50,7 @@ class StartedFlowTransition(
|
|||||||
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
||||||
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
||||||
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
||||||
|
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -410,6 +396,9 @@ class StartedFlowTransition(
|
|||||||
is FlowIORequest.ExecuteAsyncOperation<*> -> {
|
is FlowIORequest.ExecuteAsyncOperation<*> -> {
|
||||||
emptyList()
|
emptyList()
|
||||||
}
|
}
|
||||||
|
FlowIORequest.ForceCheckpoint -> {
|
||||||
|
emptyList()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,4 +425,8 @@ class StartedFlowTransition(
|
|||||||
FlowContinuation.ProcessEvents
|
FlowContinuation.ProcessEvents
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun executeForceCheckpoint(): TransitionResult {
|
||||||
|
return builder { resumeFlowLogic(Unit) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import java.net.URL
|
|||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.zip.ZipInputStream
|
import java.util.zip.ZipInputStream
|
||||||
|
import javax.naming.ServiceUnavailableException
|
||||||
|
|
||||||
class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistrationService {
|
class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistrationService {
|
||||||
private val registrationURL = URL("$compatibilityZoneURL/certificate")
|
private val registrationURL = URL("$compatibilityZoneURL/certificate")
|
||||||
@ -33,6 +34,7 @@ class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistr
|
|||||||
companion object {
|
companion object {
|
||||||
// TODO: Propagate version information from gradle
|
// TODO: Propagate version information from gradle
|
||||||
const val CLIENT_VERSION = "1.0"
|
const val CLIENT_VERSION = "1.0"
|
||||||
|
private val TRANSIENT_ERROR_STATUS_CODES = setOf(HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Throws(CertificateRequestException::class)
|
@Throws(CertificateRequestException::class)
|
||||||
@ -55,7 +57,8 @@ class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistr
|
|||||||
}
|
}
|
||||||
HTTP_NO_CONTENT -> CertificateResponse(pollInterval, null)
|
HTTP_NO_CONTENT -> CertificateResponse(pollInterval, null)
|
||||||
HTTP_UNAUTHORIZED -> throw CertificateRequestException("Certificate signing request has been rejected: ${conn.errorMessage}")
|
HTTP_UNAUTHORIZED -> throw CertificateRequestException("Certificate signing request has been rejected: ${conn.errorMessage}")
|
||||||
else -> throw IOException("Response Code ${conn.responseCode}: ${conn.errorMessage}")
|
in TRANSIENT_ERROR_STATUS_CODES -> throw ServiceUnavailableException("Could not connect with Doorman. Http response status code was ${conn.responseCode}.")
|
||||||
|
else -> throw IOException("Error while connecting to the Doorman. Http response status code was ${conn.responseCode}.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,12 +26,15 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
|
|||||||
import org.bouncycastle.asn1.x500.X500Name
|
import org.bouncycastle.asn1.x500.X500Name
|
||||||
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
|
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
|
||||||
import org.bouncycastle.util.io.pem.PemObject
|
import org.bouncycastle.util.io.pem.PemObject
|
||||||
|
import java.io.IOException
|
||||||
import java.io.StringWriter
|
import java.io.StringWriter
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.security.KeyPair
|
import java.security.KeyPair
|
||||||
import java.security.KeyStore
|
import java.security.KeyStore
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
||||||
|
import java.time.Duration
|
||||||
|
import javax.naming.ServiceUnavailableException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for managing the node registration process, which checks for any existing certificates and requests them if
|
* Helper for managing the node registration process, which checks for any existing certificates and requests them if
|
||||||
@ -45,7 +48,8 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
|
|||||||
private val networkRootTrustStorePath: Path,
|
private val networkRootTrustStorePath: Path,
|
||||||
networkRootTrustStorePassword: String,
|
networkRootTrustStorePassword: String,
|
||||||
private val keyAlias: String,
|
private val keyAlias: String,
|
||||||
private val certRole: CertRole) {
|
private val certRole: CertRole,
|
||||||
|
private val nextIdleDuration: (Duration?) -> Duration? = FixedPeriodLimitedRetrialStrategy(10, Duration.ofMinutes(1))) {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val SELF_SIGNED_PRIVATE_KEY = "Self Signed Private Key"
|
const val SELF_SIGNED_PRIVATE_KEY = "Self Signed Private Key"
|
||||||
@ -170,12 +174,22 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
|
|||||||
private fun pollServerForCertificates(requestId: String): List<X509Certificate> {
|
private fun pollServerForCertificates(requestId: String): List<X509Certificate> {
|
||||||
println("Start polling server for certificate signing approval.")
|
println("Start polling server for certificate signing approval.")
|
||||||
// Poll server to download the signed certificate once request has been approved.
|
// Poll server to download the signed certificate once request has been approved.
|
||||||
|
var idlePeriodDuration: Duration? = null
|
||||||
while (true) {
|
while (true) {
|
||||||
val (pollInterval, certificates) = certService.retrieveCertificates(requestId)
|
try {
|
||||||
if (certificates != null) {
|
val (pollInterval, certificates) = certService.retrieveCertificates(requestId)
|
||||||
return certificates
|
if (certificates != null) {
|
||||||
|
return certificates
|
||||||
|
}
|
||||||
|
Thread.sleep(pollInterval.toMillis())
|
||||||
|
} catch (e: ServiceUnavailableException) {
|
||||||
|
idlePeriodDuration = nextIdleDuration(idlePeriodDuration)
|
||||||
|
if (idlePeriodDuration != null) {
|
||||||
|
Thread.sleep(idlePeriodDuration.toMillis())
|
||||||
|
} else {
|
||||||
|
throw UnableToRegisterNodeWithDoormanException()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(pollInterval.toMillis())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +232,9 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
|
|||||||
protected open fun onSuccess(nodeCAKeyPair: KeyPair, certificates: List<X509Certificate>) {}
|
protected open fun onSuccess(nodeCAKeyPair: KeyPair, certificates: List<X509Certificate>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
class NodeRegistrationHelper(private val config: NodeConfiguration, certService: NetworkRegistrationService, regConfig: NodeRegistrationOption) :
|
class UnableToRegisterNodeWithDoormanException : IOException()
|
||||||
|
|
||||||
|
class NodeRegistrationHelper(private val config: NodeConfiguration, certService: NetworkRegistrationService, regConfig: NodeRegistrationOption, computeNextIdleDoormanConnectionPollInterval: (Duration?) -> Duration? = FixedPeriodLimitedRetrialStrategy(10, Duration.ofMinutes(1))) :
|
||||||
NetworkRegistrationHelper(config,
|
NetworkRegistrationHelper(config,
|
||||||
config.myLegalName,
|
config.myLegalName,
|
||||||
config.emailAddress,
|
config.emailAddress,
|
||||||
@ -226,7 +242,8 @@ class NodeRegistrationHelper(private val config: NodeConfiguration, certService:
|
|||||||
regConfig.networkRootTrustStorePath,
|
regConfig.networkRootTrustStorePath,
|
||||||
regConfig.networkRootTrustStorePassword,
|
regConfig.networkRootTrustStorePassword,
|
||||||
CORDA_CLIENT_CA,
|
CORDA_CLIENT_CA,
|
||||||
CertRole.NODE_CA) {
|
CertRole.NODE_CA,
|
||||||
|
computeNextIdleDoormanConnectionPollInterval) {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
val logger = contextLogger()
|
val logger = contextLogger()
|
||||||
@ -265,3 +282,18 @@ class NodeRegistrationHelper(private val config: NodeConfiguration, certService:
|
|||||||
println("Node trust store stored in ${config.trustStoreFile}.")
|
println("Node trust store stored in ${config.trustStoreFile}.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class FixedPeriodLimitedRetrialStrategy(times: Int, private val period: Duration) : (Duration?) -> Duration? {
|
||||||
|
|
||||||
|
init {
|
||||||
|
require(times > 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
private var counter = times
|
||||||
|
|
||||||
|
override fun invoke(@Suppress("UNUSED_PARAMETER") previousPeriod: Duration?): Duration? {
|
||||||
|
synchronized(this) {
|
||||||
|
return if (counter-- > 0) period else null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,93 @@
|
|||||||
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.InitiatingFlow
|
||||||
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.internal.IdempotentFlow
|
||||||
|
import net.corda.core.internal.TimedFlow
|
||||||
|
import net.corda.core.internal.packageName
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
|
import net.corda.node.internal.StartedNode
|
||||||
|
import net.corda.node.services.config.FlowTimeoutConfiguration
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
|
import net.corda.testing.node.internal.InternalMockNetwork
|
||||||
|
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||||
|
import net.corda.testing.node.internal.startFlow
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
class IdempotentFlowTests {
|
||||||
|
private lateinit var mockNet: InternalMockNetwork
|
||||||
|
private lateinit var nodeA: StartedNode<InternalMockNetwork.MockNode>
|
||||||
|
private lateinit var nodeB: StartedNode<InternalMockNetwork.MockNode>
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val executionCounter = AtomicInteger(0)
|
||||||
|
val subFlowExecutionCounter = AtomicInteger(0)
|
||||||
|
val suspendedOnce = AtomicBoolean(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun start() {
|
||||||
|
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName))
|
||||||
|
nodeA = mockNet.createNode(InternalMockNodeParameters(
|
||||||
|
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
|
||||||
|
configOverrides = {
|
||||||
|
conf: NodeConfiguration ->
|
||||||
|
val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
|
||||||
|
doReturn(retryConfig).whenever(conf).flowTimeout
|
||||||
|
}
|
||||||
|
))
|
||||||
|
nodeB = mockNet.createNode()
|
||||||
|
mockNet.startNodes()
|
||||||
|
executionCounter.set(0)
|
||||||
|
subFlowExecutionCounter.set(0)
|
||||||
|
suspendedOnce.set(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun cleanUp() {
|
||||||
|
mockNet.stopNodes()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `restarting idempotent flow does not replay any part of its parent flow`() {
|
||||||
|
nodeA.services.startFlow(SideEffectFlow()).resultFuture.get()
|
||||||
|
assertEquals(1, executionCounter.get())
|
||||||
|
assertEquals(2, subFlowExecutionCounter.get())
|
||||||
|
}
|
||||||
|
|
||||||
|
@InitiatingFlow
|
||||||
|
private class SideEffectFlow : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
executionCounter.incrementAndGet() // This shouldn't be replayed when the TimedSubFlow restarts.
|
||||||
|
subFlow(TimedSubflow()) // Checkpoint should be taken before invoking the sub-flow.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TimedSubflow : FlowLogic<Unit>(), TimedFlow {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
|
||||||
|
// so this should be replayed when TimedSubFlow restarts.
|
||||||
|
subFlow(IdempotentSubFlow()) // Checkpoint shouldn't be taken before invoking the sub-flow.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class IdempotentSubFlow : FlowLogic<Unit>(), IdempotentFlow {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
if (!IdempotentFlowTests.suspendedOnce.getAndSet(true))
|
||||||
|
waitForLedgerCommit(SecureHash.zeroHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -32,11 +32,6 @@ sourceSets {
|
|||||||
|
|
||||||
processResources {
|
processResources {
|
||||||
from file("$rootDir/config/dev/log4j2.xml")
|
from file("$rootDir/config/dev/log4j2.xml")
|
||||||
from file("$rootDir/config/dev/jolokia-access.xml")
|
|
||||||
}
|
|
||||||
|
|
||||||
processTestResources {
|
|
||||||
from file("$rootDir/config/test/jolokia-access.xml")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user