diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle index 2e53526dba..ee557d933b 100644 --- a/client/rpc/build.gradle +++ b/client/rpc/build.gradle @@ -35,6 +35,9 @@ sourceSets { runtimeClasspath += main.output + test.output srcDir file('src/integration-test/java') } + resources { + srcDirs "src/integration-test/resources" + } } smokeTest { kotlin { diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index 8777d76303..60271d97a3 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -1,13 +1,22 @@ package net.corda.client.rpc -import net.corda.core.context.* +import net.corda.core.CordaRuntimeException +import net.corda.core.context.Actor +import net.corda.core.context.AuthServiceId +import net.corda.core.context.InvocationContext +import net.corda.core.context.InvocationOrigin +import net.corda.core.context.Trace import net.corda.core.contracts.FungibleAsset import net.corda.core.crypto.random63BitValue import net.corda.core.identity.Party import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.location import net.corda.core.internal.toPath -import net.corda.core.messaging.* +import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.StateMachineInfo +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.contextLogger @@ -22,8 +31,14 @@ import net.corda.finance.workflows.getCashBalance import net.corda.finance.workflows.getCashBalances import net.corda.node.internal.NodeWithInfo import net.corda.node.services.Permissions.Companion.all +import net.corda.nodeapi.exceptions.DuplicateAttachmentException import net.corda.testing.common.internal.checkNotOnClasspath -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.node.User import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.ProcessUtilities @@ -31,6 +46,7 @@ import net.corda.testing.node.internal.poll import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Before import org.junit.Test @@ -38,7 +54,10 @@ import rx.subjects.PublishSubject import java.net.URLClassLoader import java.nio.file.Paths import java.util.* -import java.util.concurrent.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue @@ -47,6 +66,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = companion object { val rpcUser = User("user1", "test", permissions = setOf(all())) val log = contextLogger() + const val testJar = "net/corda/client/rpc/test.jar" } private lateinit var node: NodeWithInfo @@ -244,6 +264,29 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = assertThat(outOfProcessRpc.waitFor()).isZero() // i.e. no exceptions were thrown } + @Test + fun `nonspecific reconnect errors dont trigger graceful reconnect`() { + val inputJar1 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!! + val inputJar2 = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)!! + var disconnects = 0 + var reconnects = 0 + val gracefulReconnect = GracefulReconnect(onDisconnect = {++disconnects}, onReconnect = {++reconnects}) + + // This just recreates the original issue which allowed us to fix this. Any non-rpc exception would do + // https://r3-cev.atlassian.net/browse/CORDA-3572 + assertThatThrownBy { + client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).use { + val rpc = it.proxy + rpc.uploadAttachment(inputJar1) + rpc.uploadAttachment(inputJar2) + } + }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining(DuplicateAttachmentException::class.java.name) + + assertThat(disconnects).isEqualTo(0) + assertThat(reconnects).isEqualTo(0) + } + private fun checkShellNotification(info: StateMachineInfo) { val context = info.invocationContext assertThat(context.origin).isInstanceOf(InvocationOrigin.Shell::class.java) diff --git a/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar b/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar new file mode 100644 index 0000000000..9ce0b7d2d2 Binary files /dev/null and b/client/rpc/src/integration-test/resources/net/corda/client/rpc/test.jar differ diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 158788ee81..9fdfa6089a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -271,6 +271,7 @@ class ReconnectingCordaRPCOps private constructor( log.info("Could not establish connection. Next retry interval $nextInterval") return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries) } + override val proxy: CordaRPCOps get() = current.proxy override val serverProtocolVersion @@ -305,6 +306,7 @@ class ReconnectingCordaRPCOps private constructor( * * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. */ + @Suppress("ThrowsCount", "ComplexMethod") private fun doInvoke(method: Method, args: Array?, maxNumberOfAttempts: Int): Any? { checkIfClosed() var remainingAttempts = maxNumberOfAttempts @@ -338,9 +340,8 @@ class ReconnectingCordaRPCOps private constructor( throw RPCException("User does not have permission to perform operation ${method.name}.", e) } else -> { - log.warn("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) - reconnectingRPCConnection.reconnectOnError(e) - checkIfIsStartFlow(method, e) + log.warn("Failed to perform operation ${method.name}.", e) + throw e.targetException } } lastException = e.targetException