mirror of
https://github.com/corda/corda.git
synced 2025-06-22 00:57:21 +00:00
Test that reliably fails with Artemis 2.1.0 (#797)
This commit is contained in:
@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.Serializer
|
|||||||
import com.esotericsoftware.kryo.io.Input
|
import com.esotericsoftware.kryo.io.Input
|
||||||
import com.esotericsoftware.kryo.io.Output
|
import com.esotericsoftware.kryo.io.Output
|
||||||
import com.esotericsoftware.kryo.pool.KryoPool
|
import com.esotericsoftware.kryo.pool.KryoPool
|
||||||
|
import com.google.common.base.Stopwatch
|
||||||
import com.google.common.net.HostAndPort
|
import com.google.common.net.HostAndPort
|
||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
@ -16,6 +17,7 @@ import net.corda.node.services.messaging.RPCServerConfiguration
|
|||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.nodeapi.RPCKryo
|
import net.corda.nodeapi.RPCKryo
|
||||||
import net.corda.testing.*
|
import net.corda.testing.*
|
||||||
|
import org.apache.activemq.artemis.ArtemisConstants
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.junit.Assert.assertEquals
|
import org.junit.Assert.assertEquals
|
||||||
import org.junit.Assert.assertTrue
|
import org.junit.Assert.assertTrue
|
||||||
@ -24,11 +26,9 @@ import rx.Observable
|
|||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import rx.subjects.UnicastSubject
|
import rx.subjects.UnicastSubject
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.*
|
||||||
import java.util.concurrent.Executors
|
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
|
|
||||||
class RPCStabilityTests {
|
class RPCStabilityTests {
|
||||||
@ -218,22 +218,50 @@ class RPCStabilityTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `client reconnects to rebooted server`() {
|
fun `client reconnects to rebooted server`() {
|
||||||
|
// Artemis 2.1.0 has a bug that makes this test fail, and 25 trials are needed to make it fail reliably.
|
||||||
|
// In the success case 25 trials take 2 minutes, so I've disabled them for the known-good Artemis version.
|
||||||
|
// TODO: Remove multiple trials when we fix the Artemis bug (which should have its own test(s)).
|
||||||
|
val trials = if (ArtemisConstants::class.java.`package`.implementationVersion == "1.5.3") 1 else 25
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val ops = object : ReconnectOps {
|
val coreBurner = thread {
|
||||||
override val protocolVersion = 0
|
while (!Thread.interrupted()) {
|
||||||
override fun ping() = "pong"
|
// Spin.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
val ops = object : ReconnectOps {
|
||||||
|
override val protocolVersion = 0
|
||||||
|
override fun ping() = "pong"
|
||||||
|
}
|
||||||
|
var serverFollower = shutdownManager.follower()
|
||||||
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
|
serverFollower.unfollow()
|
||||||
|
val clientFollower = shutdownManager.follower()
|
||||||
|
val client = startRpcClient<ReconnectOps>(serverPort).getOrThrow()
|
||||||
|
clientFollower.unfollow()
|
||||||
|
assertEquals("pong", client.ping())
|
||||||
|
val background = Executors.newSingleThreadExecutor()
|
||||||
|
(1..trials).forEach {
|
||||||
|
System.err.println("Start trial $it of $trials.")
|
||||||
|
serverFollower.shutdown()
|
||||||
|
serverFollower = shutdownManager.follower()
|
||||||
|
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
|
||||||
|
serverFollower.unfollow()
|
||||||
|
val stopwatch = Stopwatch.createStarted()
|
||||||
|
val pingFuture = background.submit(Callable {
|
||||||
|
client.ping() // Would also hang in foreground, we need it in background so we can timeout.
|
||||||
|
})
|
||||||
|
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
|
||||||
|
System.err.println("Took ${stopwatch.elapsed(TimeUnit.MILLISECONDS)} millis.")
|
||||||
|
}
|
||||||
|
background.shutdown() // No point in the hanging case.
|
||||||
|
clientFollower.shutdown() // Driver would do this after the current server, causing 'legit' failover hang.
|
||||||
|
} finally {
|
||||||
|
with(coreBurner) {
|
||||||
|
interrupt()
|
||||||
|
join()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val serverFollower = shutdownManager.follower()
|
|
||||||
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
|
||||||
serverFollower.unfollow()
|
|
||||||
val clientFollower = shutdownManager.follower()
|
|
||||||
val client = startRpcClient<ReconnectOps>(serverPort).getOrThrow()
|
|
||||||
clientFollower.unfollow()
|
|
||||||
assertEquals("pong", client.ping())
|
|
||||||
serverFollower.shutdown()
|
|
||||||
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
|
|
||||||
assertEquals("pong", client.ping())
|
|
||||||
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user