Merge remote-tracking branch 'remotes/open/master' into merges/june-19-16-47

This commit is contained in:
sollecitom
2018-06-19 16:48:12 +01:00
15 changed files with 195 additions and 33 deletions

View File

@ -36,6 +36,7 @@ import net.corda.node.services.config.shouldStartSSHDaemon
import net.corda.node.services.transactions.bftSMaRtSerialFilter
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NodeRegistrationHelper
import net.corda.node.utilities.registration.UnableToRegisterNodeWithDoormanException
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.config.UnknownConfigurationKeysException
import net.corda.nodeapi.internal.persistence.DatabaseMigrationException
@ -134,6 +135,9 @@ open class NodeStartup(val args: Array<String>) {
return true
}
logStartupInfo(versionInfo, cmdlineOptions, conf)
} catch (e: UnableToRegisterNodeWithDoormanException) {
logger.warn("Node registration service is unavailable. Perhaps try to perform the initial registration again after a while.")
return false
} catch (e: Exception) {
logger.error("Exception during node registration", e)
return false

View File

@ -264,6 +264,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
checkpointIfSubflowIdempotent(subFlow.javaClass)
processEventImmediately(
Event.EnterSubFlow(subFlow.javaClass,
createSubFlowVersion(
@ -284,6 +285,21 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
/**
* If the sub-flow is [IdempotentFlow] we need to perform a checkpoint to make sure any potentially side-effect
* generating logic between the last checkpoint and the sub-flow invocation does not get replayed if the
* flow restarts.
*
* We don't checkpoint if the current flow is [IdempotentFlow] as well.
*/
@Suspendable
private fun checkpointIfSubflowIdempotent(subFlow: Class<FlowLogic<*>>) {
val currentFlow = snapshot().checkpoint.subFlowStack.last().flowClass
if (!currentFlow.isIdempotentFlow() && subFlow.isIdempotentFlow()) {
suspend(FlowIORequest.ForceCheckpoint, false)
}
}
@Suspendable
override fun initiateFlow(party: Party): FlowSession {
val resume = processEventImmediately(

View File

@ -206,8 +206,8 @@ class StaffedFlowHospital {
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
return Diagnosis.DISCHARGE
} else {
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}." +
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and" +
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and " +
"unable to service requests fast enough. Please try again later."
newError.setMessage(errorMsg)
log.warn(errorMsg)

View File

@ -16,22 +16,7 @@ import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.toNonEmptySet
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitialSessionMessage
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionMap
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.SubFlow
import net.corda.node.services.statemachine.*
/**
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
@ -65,6 +50,7 @@ class StartedFlowTransition(
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}
}
@ -410,6 +396,9 @@ class StartedFlowTransition(
is FlowIORequest.ExecuteAsyncOperation<*> -> {
emptyList()
}
FlowIORequest.ForceCheckpoint -> {
emptyList()
}
}
}
@ -436,4 +425,8 @@ class StartedFlowTransition(
FlowContinuation.ProcessEvents
}
}
private fun executeForceCheckpoint(): TransitionResult {
return builder { resumeFlowLogic(Unit) }
}
}

View File

@ -26,6 +26,7 @@ import java.net.URL
import java.security.cert.X509Certificate
import java.util.*
import java.util.zip.ZipInputStream
import javax.naming.ServiceUnavailableException
class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistrationService {
private val registrationURL = URL("$compatibilityZoneURL/certificate")
@ -33,6 +34,7 @@ class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistr
companion object {
// TODO: Propagate version information from gradle
const val CLIENT_VERSION = "1.0"
private val TRANSIENT_ERROR_STATUS_CODES = setOf(HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT)
}
@Throws(CertificateRequestException::class)
@ -55,7 +57,8 @@ class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistr
}
HTTP_NO_CONTENT -> CertificateResponse(pollInterval, null)
HTTP_UNAUTHORIZED -> throw CertificateRequestException("Certificate signing request has been rejected: ${conn.errorMessage}")
else -> throw IOException("Response Code ${conn.responseCode}: ${conn.errorMessage}")
in TRANSIENT_ERROR_STATUS_CODES -> throw ServiceUnavailableException("Could not connect with Doorman. Http response status code was ${conn.responseCode}.")
else -> throw IOException("Error while connecting to the Doorman. Http response status code was ${conn.responseCode}.")
}
}

View File

@ -26,12 +26,15 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
import org.bouncycastle.util.io.pem.PemObject
import java.io.IOException
import java.io.StringWriter
import java.nio.file.Path
import java.security.KeyPair
import java.security.KeyStore
import java.security.PublicKey
import java.security.cert.X509Certificate
import java.time.Duration
import javax.naming.ServiceUnavailableException
/**
* Helper for managing the node registration process, which checks for any existing certificates and requests them if
@ -45,7 +48,8 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
private val networkRootTrustStorePath: Path,
networkRootTrustStorePassword: String,
private val keyAlias: String,
private val certRole: CertRole) {
private val certRole: CertRole,
private val nextIdleDuration: (Duration?) -> Duration? = FixedPeriodLimitedRetrialStrategy(10, Duration.ofMinutes(1))) {
companion object {
const val SELF_SIGNED_PRIVATE_KEY = "Self Signed Private Key"
@ -170,12 +174,22 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
private fun pollServerForCertificates(requestId: String): List<X509Certificate> {
println("Start polling server for certificate signing approval.")
// Poll server to download the signed certificate once request has been approved.
var idlePeriodDuration: Duration? = null
while (true) {
val (pollInterval, certificates) = certService.retrieveCertificates(requestId)
if (certificates != null) {
return certificates
try {
val (pollInterval, certificates) = certService.retrieveCertificates(requestId)
if (certificates != null) {
return certificates
}
Thread.sleep(pollInterval.toMillis())
} catch (e: ServiceUnavailableException) {
idlePeriodDuration = nextIdleDuration(idlePeriodDuration)
if (idlePeriodDuration != null) {
Thread.sleep(idlePeriodDuration.toMillis())
} else {
throw UnableToRegisterNodeWithDoormanException()
}
}
Thread.sleep(pollInterval.toMillis())
}
}
@ -218,7 +232,9 @@ open class NetworkRegistrationHelper(private val config: SSLConfiguration,
protected open fun onSuccess(nodeCAKeyPair: KeyPair, certificates: List<X509Certificate>) {}
}
class NodeRegistrationHelper(private val config: NodeConfiguration, certService: NetworkRegistrationService, regConfig: NodeRegistrationOption) :
class UnableToRegisterNodeWithDoormanException : IOException()
class NodeRegistrationHelper(private val config: NodeConfiguration, certService: NetworkRegistrationService, regConfig: NodeRegistrationOption, computeNextIdleDoormanConnectionPollInterval: (Duration?) -> Duration? = FixedPeriodLimitedRetrialStrategy(10, Duration.ofMinutes(1))) :
NetworkRegistrationHelper(config,
config.myLegalName,
config.emailAddress,
@ -226,7 +242,8 @@ class NodeRegistrationHelper(private val config: NodeConfiguration, certService:
regConfig.networkRootTrustStorePath,
regConfig.networkRootTrustStorePassword,
CORDA_CLIENT_CA,
CertRole.NODE_CA) {
CertRole.NODE_CA,
computeNextIdleDoormanConnectionPollInterval) {
companion object {
val logger = contextLogger()
@ -265,3 +282,18 @@ class NodeRegistrationHelper(private val config: NodeConfiguration, certService:
println("Node trust store stored in ${config.trustStoreFile}.")
}
}
private class FixedPeriodLimitedRetrialStrategy(times: Int, private val period: Duration) : (Duration?) -> Duration? {
init {
require(times > 0)
}
private var counter = times
override fun invoke(@Suppress("UNUSED_PARAMETER") previousPeriod: Duration?): Duration? {
synchronized(this) {
return if (counter-- > 0) period else null
}
}
}

View File

@ -0,0 +1,93 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.packageName
import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
class IdempotentFlowTests {
private lateinit var mockNet: InternalMockNetwork
private lateinit var nodeA: StartedNode<InternalMockNetwork.MockNode>
private lateinit var nodeB: StartedNode<InternalMockNetwork.MockNode>
companion object {
val executionCounter = AtomicInteger(0)
val subFlowExecutionCounter = AtomicInteger(0)
val suspendedOnce = AtomicBoolean(false)
}
@Before
fun start() {
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName))
nodeA = mockNet.createNode(InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = {
conf: NodeConfiguration ->
val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
doReturn(retryConfig).whenever(conf).flowTimeout
}
))
nodeB = mockNet.createNode()
mockNet.startNodes()
executionCounter.set(0)
subFlowExecutionCounter.set(0)
suspendedOnce.set(false)
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `restarting idempotent flow does not replay any part of its parent flow`() {
nodeA.services.startFlow(SideEffectFlow()).resultFuture.get()
assertEquals(1, executionCounter.get())
assertEquals(2, subFlowExecutionCounter.get())
}
@InitiatingFlow
private class SideEffectFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
executionCounter.incrementAndGet() // This shouldn't be replayed when the TimedSubFlow restarts.
subFlow(TimedSubflow()) // Checkpoint should be taken before invoking the sub-flow.
}
}
private class TimedSubflow : FlowLogic<Unit>(), TimedFlow {
@Suspendable
override fun call() {
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
// so this should be replayed when TimedSubFlow restarts.
subFlow(IdempotentSubFlow()) // Checkpoint shouldn't be taken before invoking the sub-flow.
}
}
private class IdempotentSubFlow : FlowLogic<Unit>(), IdempotentFlow {
@Suspendable
override fun call() {
if (!IdempotentFlowTests.suspendedOnce.getAndSet(true))
waitForLedgerCommit(SecureHash.zeroHash)
}
}
}