diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle index 2e53526dba..ee557d933b 100644 --- a/client/rpc/build.gradle +++ b/client/rpc/build.gradle @@ -35,6 +35,9 @@ sourceSets { runtimeClasspath += main.output + test.output srcDir file('src/integration-test/java') } + resources { + srcDirs "src/integration-test/resources" + } } smokeTest { kotlin { diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index 8777d76303..60271d97a3 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -1,13 +1,22 @@ package net.corda.client.rpc -import net.corda.core.context.* +import net.corda.core.CordaRuntimeException +import net.corda.core.context.Actor +import net.corda.core.context.AuthServiceId +import net.corda.core.context.InvocationContext +import net.corda.core.context.InvocationOrigin +import net.corda.core.context.Trace import net.corda.core.contracts.FungibleAsset import net.corda.core.crypto.random63BitValue import net.corda.core.identity.Party import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.location import net.corda.core.internal.toPath -import net.corda.core.messaging.* +import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.StateMachineInfo +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.contextLogger @@ -22,8 +31,14 @@ import net.corda.finance.workflows.getCashBalance import net.corda.finance.workflows.getCashBalances import net.corda.node.internal.NodeWithInfo import net.corda.node.services.Permissions.Companion.all +import net.corda.nodeapi.exceptions.DuplicateAttachmentException import net.corda.testing.common.internal.checkNotOnClasspath -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.node.User import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.ProcessUtilities @@ -31,6 +46,7 @@ import net.corda.testing.node.internal.poll import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Before import org.junit.Test @@ -38,7 +54,10 @@ import rx.subjects.PublishSubject import java.net.URLClassLoader import java.nio.file.Paths import java.util.* -import java.util.concurrent.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue @@ -47,6 +66,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = companion object { val rpcUser = User("user1", "test", permissions = setOf(all())) val log = contextLogger() + const val testJar = "net/corda/client/rpc/test.jar" } private lateinit var node: NodeWithInfo @@ -244,6 +264,29 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = assertThat(outOfProcessRpc.waitFor()).isZero() // i.e. no exceptions were thrown } + @Test + fun `nonspecific reconnect errors dont trigger graceful reconnect`() { + val inputJar1 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!! + val inputJar2 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!! + var disconnects = 0 + var reconnects = 0 + val gracefulReconnect = GracefulReconnect(onDisconnect = {++disconnects}, onReconnect = {++reconnects}) + + // This just recreates the original issue which allowed us to fix this. Any non-rpc exception would do + // https://r3-cev.atlassian.net/browse/CORDA-3572 + assertThatThrownBy { + client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).use { + val rpc = it.proxy + rpc.uploadAttachment(inputJar1) + rpc.uploadAttachment(inputJar2) + } + }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining(DuplicateAttachmentException::class.java.name) + + assertThat(disconnects).isEqualTo(0) + assertThat(reconnects).isEqualTo(0) + } + private fun checkShellNotification(info: StateMachineInfo) { val context = info.invocationContext assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java) diff --git a/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar b/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar new file mode 100644 index 0000000000..9ce0b7d2d2 Binary files /dev/null and b/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar differ diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 9b4a76b579..9fdfa6089a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -17,6 +17,7 @@ import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConn import net.corda.client.rpc.reconnect.CouldNotStartFlowException import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.messaging.InternalCordaRPCOps +import net.corda.core.internal.min import net.corda.core.internal.times import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.ClientRpcSslOptions @@ -262,11 +263,15 @@ class ReconnectingCordaRPCOps private constructor( } // Could not connect this time round - pause before giving another try. Thread.sleep(retryInterval.toMillis()) - // TODO - make the exponential retry factor configurable. val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size - val nextInterval = retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier + val nextInterval = min( + rpcConfiguration.connectionMaxRetryInterval, + retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier + ) + log.info("Could not establish connection. Next retry interval $nextInterval") return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries) } + override val proxy: CordaRPCOps get() = current.proxy override val serverProtocolVersion @@ -301,6 +306,7 @@ class ReconnectingCordaRPCOps private constructor( * * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. */ + @Suppress("ThrowsCount", "ComplexMethod") private fun doInvoke(method: Method, args: Array?, maxNumberOfAttempts: Int): Any? { checkIfClosed() var remainingAttempts = maxNumberOfAttempts @@ -334,9 +340,8 @@ class ReconnectingCordaRPCOps private constructor( throw RPCException("User does not have permission to perform operation ${method.name}.", e) } else -> { - log.warn("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) - reconnectingRPCConnection.reconnectOnError(e) - checkIfIsStartFlow(method, e) + log.warn("Failed to perform operation ${method.name}.", e) + throw e.targetException } } lastException = e.targetException diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 77e27e2439..fcfd9a5986 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -4,7 +4,12 @@ package net.corda.core.internal import net.corda.core.DeleteForDJVM import net.corda.core.KeepForDJVM -import net.corda.core.crypto.* +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignedData +import net.corda.core.crypto.sha256 +import net.corda.core.crypto.sign import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize @@ -40,11 +45,23 @@ import java.security.KeyPair import java.security.MessageDigest import java.security.PrivateKey import java.security.PublicKey -import java.security.cert.* +import java.security.cert.CertPath +import java.security.cert.CertPathValidator +import java.security.cert.CertPathValidatorException +import java.security.cert.PKIXCertPathValidatorResult +import java.security.cert.PKIXParameters +import java.security.cert.TrustAnchor +import java.security.cert.X509Certificate import java.time.Duration import java.time.temporal.Temporal import java.util.* -import java.util.Spliterator.* +import java.util.Spliterator.DISTINCT +import java.util.Spliterator.IMMUTABLE +import java.util.Spliterator.NONNULL +import java.util.Spliterator.ORDERED +import java.util.Spliterator.SIZED +import java.util.Spliterator.SORTED +import java.util.Spliterator.SUBSIZED import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -78,6 +95,8 @@ infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(th operator fun Duration.div(divider: Long): Duration = dividedBy(divider) operator fun Duration.times(multiplicand: Long): Duration = multipliedBy(multiplicand) operator fun Duration.times(multiplicand: Double): Duration = Duration.ofNanos((toNanos() * multiplicand).roundToLong()) +fun min(d1: Duration, d2: Duration): Duration = if (d1 <= d2) d1 else d2 + /** * Returns the single element matching the given [predicate], or `null` if the collection is empty, or throws exception