Merge pull request #5961 from corda/rfowler-os4.4-os4.5-20200213

Rfowler os4.4 os4.5 20200213
This commit is contained in:
Matthew Nesbit 2020-02-13 17:53:43 +00:00 committed by GitHub
commit da7a5cce4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 12 deletions

View File

@ -35,6 +35,9 @@ sourceSets {
runtimeClasspath += main.output + test.output runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/java') srcDir file('src/integration-test/java')
} }
resources {
srcDirs "src/integration-test/resources"
}
} }
smokeTest { smokeTest {
kotlin { kotlin {

View File

@ -1,13 +1,22 @@
package net.corda.client.rpc package net.corda.client.rpc
import net.corda.core.context.* import net.corda.core.CordaRuntimeException
import net.corda.core.context.Actor
import net.corda.core.context.AuthServiceId
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.context.Trace
import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.FungibleAsset
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.location import net.corda.core.internal.location
import net.corda.core.internal.toPath import net.corda.core.internal.toPath
import net.corda.core.messaging.* import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
@ -22,8 +31,14 @@ import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances import net.corda.finance.workflows.getCashBalances
import net.corda.node.internal.NodeWithInfo import net.corda.node.internal.NodeWithInfo
import net.corda.node.services.Permissions.Companion.all import net.corda.node.services.Permissions.Companion.all
import net.corda.nodeapi.exceptions.DuplicateAttachmentException
import net.corda.testing.common.internal.checkNotOnClasspath import net.corda.testing.common.internal.checkNotOnClasspath
import net.corda.testing.core.* import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence
import net.corda.testing.node.User import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.NodeBasedTest
import net.corda.testing.node.internal.ProcessUtilities import net.corda.testing.node.internal.ProcessUtilities
@ -31,6 +46,7 @@ import net.corda.testing.node.internal.poll
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
@ -38,7 +54,10 @@ import rx.subjects.PublishSubject
import java.net.URLClassLoader import java.net.URLClassLoader
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFalse import kotlin.test.assertFalse
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -47,6 +66,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
companion object { companion object {
val rpcUser = User("user1", "test", permissions = setOf(all())) val rpcUser = User("user1", "test", permissions = setOf(all()))
val log = contextLogger() val log = contextLogger()
const val testJar = "net/corda/client/rpc/test.jar"
} }
private lateinit var node: NodeWithInfo private lateinit var node: NodeWithInfo
@ -244,6 +264,29 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
assertThat(outOfProcessRpc.waitFor()).isZero() // i.e. no exceptions were thrown assertThat(outOfProcessRpc.waitFor()).isZero() // i.e. no exceptions were thrown
} }
@Test
fun `nonspecific reconnect errors dont trigger graceful reconnect`() {
val inputJar1 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!!
val inputJar2 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!!
var disconnects = 0
var reconnects = 0
val gracefulReconnect = GracefulReconnect(onDisconnect = {++disconnects}, onReconnect = {++reconnects})
// This just recreates the original issue which allowed us to fix this. Any non-rpc exception would do
// https://r3-cev.atlassian.net/browse/CORDA-3572
assertThatThrownBy {
client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).use {
val rpc = it.proxy
rpc.uploadAttachment(inputJar1)
rpc.uploadAttachment(inputJar2)
}
}.isInstanceOf(CordaRuntimeException::class.java)
.hasMessageContaining(DuplicateAttachmentException::class.java.name)
assertThat(disconnects).isEqualTo(0)
assertThat(reconnects).isEqualTo(0)
}
private fun checkShellNotification(info: StateMachineInfo) { private fun checkShellNotification(info: StateMachineInfo) {
val context = info.invocationContext val context = info.invocationContext
assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java) assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java)

View File

@ -17,6 +17,7 @@ import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConn
import net.corda.client.rpc.reconnect.CouldNotStartFlowException import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.messaging.InternalCordaRPCOps import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.min
import net.corda.core.internal.times import net.corda.core.internal.times
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.ClientRpcSslOptions
@ -262,11 +263,15 @@ class ReconnectingCordaRPCOps private constructor(
} }
// Could not connect this time round - pause before giving another try. // Could not connect this time round - pause before giving another try.
Thread.sleep(retryInterval.toMillis()) Thread.sleep(retryInterval.toMillis())
// TODO - make the exponential retry factor configurable.
val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size
val nextInterval = retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier val nextInterval = min(
rpcConfiguration.connectionMaxRetryInterval,
retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier
)
log.info("Could not establish connection. Next retry interval $nextInterval")
return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries) return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries)
} }
override val proxy: CordaRPCOps override val proxy: CordaRPCOps
get() = current.proxy get() = current.proxy
override val serverProtocolVersion override val serverProtocolVersion
@ -301,6 +306,7 @@ class ReconnectingCordaRPCOps private constructor(
* *
* A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
*/ */
@Suppress("ThrowsCount", "ComplexMethod")
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? { private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
checkIfClosed() checkIfClosed()
var remainingAttempts = maxNumberOfAttempts var remainingAttempts = maxNumberOfAttempts
@ -334,9 +340,8 @@ class ReconnectingCordaRPCOps private constructor(
throw RPCException("User does not have permission to perform operation ${method.name}.", e) throw RPCException("User does not have permission to perform operation ${method.name}.", e)
} }
else -> { else -> {
log.warn("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) log.warn("Failed to perform operation ${method.name}.", e)
reconnectingRPCConnection.reconnectOnError(e) throw e.targetException
checkIfIsStartFlow(method, e)
} }
} }
lastException = e.targetException lastException = e.targetException

View File

@ -4,7 +4,12 @@ package net.corda.core.internal
import net.corda.core.DeleteForDJVM import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM import net.corda.core.KeepForDJVM
import net.corda.core.crypto.* import net.corda.core.crypto.Crypto
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sha256
import net.corda.core.crypto.sign
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -40,11 +45,23 @@ import java.security.KeyPair
import java.security.MessageDigest import java.security.MessageDigest
import java.security.PrivateKey import java.security.PrivateKey
import java.security.PublicKey import java.security.PublicKey
import java.security.cert.* import java.security.cert.CertPath
import java.security.cert.CertPathValidator
import java.security.cert.CertPathValidatorException
import java.security.cert.PKIXCertPathValidatorResult
import java.security.cert.PKIXParameters
import java.security.cert.TrustAnchor
import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import java.time.temporal.Temporal import java.time.temporal.Temporal
import java.util.* import java.util.*
import java.util.Spliterator.* import java.util.Spliterator.DISTINCT
import java.util.Spliterator.IMMUTABLE
import java.util.Spliterator.NONNULL
import java.util.Spliterator.ORDERED
import java.util.Spliterator.SIZED
import java.util.Spliterator.SORTED
import java.util.Spliterator.SUBSIZED
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.stream.Collectors import java.util.stream.Collectors
@ -78,6 +95,8 @@ infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(th
operator fun Duration.div(divider: Long): Duration = dividedBy(divider) operator fun Duration.div(divider: Long): Duration = dividedBy(divider)
operator fun Duration.times(multiplicand: Long): Duration = multipliedBy(multiplicand) operator fun Duration.times(multiplicand: Long): Duration = multipliedBy(multiplicand)
operator fun Duration.times(multiplicand: Double): Duration = Duration.ofNanos((toNanos() * multiplicand).roundToLong()) operator fun Duration.times(multiplicand: Double): Duration = Duration.ofNanos((toNanos() * multiplicand).roundToLong())
fun min(d1: Duration, d2: Duration): Duration = if (d1 <= d2) d1 else d2
/** /**
* Returns the single element matching the given [predicate], or `null` if the collection is empty, or throws exception * Returns the single element matching the given [predicate], or `null` if the collection is empty, or throws exception