mirror of
https://github.com/corda/corda.git
synced 2025-01-14 16:59:52 +00:00
Merge remote-tracking branch 'open/master' into bogdan-merge-08032018
This commit is contained in:
commit
fd9098e7e2
@ -17,13 +17,16 @@ import net.corda.core.crypto.random63BitValue
|
|||||||
import net.corda.core.internal.concurrent.fork
|
import net.corda.core.internal.concurrent.fork
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
import net.corda.testing.internal.testThreadFactory
|
import net.corda.testing.internal.testThreadFactory
|
||||||
|
import net.corda.testing.node.User
|
||||||
import net.corda.testing.node.internal.*
|
import net.corda.testing.node.internal.*
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
@ -35,12 +38,15 @@ import rx.Observable
|
|||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import rx.subjects.UnicastSubject
|
import rx.subjects.UnicastSubject
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
import java.time.Instant
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import kotlin.concurrent.thread
|
||||||
|
import kotlin.test.assertNotNull
|
||||||
|
|
||||||
class RPCStabilityTests {
|
class RPCStabilityTests {
|
||||||
@Rule
|
@Rule
|
||||||
@ -263,6 +269,114 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `client reconnects to server and resends buffered messages`() {
|
||||||
|
rpcDriver(startNodesInProcess = false) {
|
||||||
|
var nodeInfo: NodeInfo? = null
|
||||||
|
var nodeTime: Instant? = null
|
||||||
|
val alice = startNode(providedName = ALICE_NAME,
|
||||||
|
rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow()
|
||||||
|
CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection ->
|
||||||
|
val proxy = connection.proxy
|
||||||
|
alice.stop()
|
||||||
|
val nodeInfoThread = thread {
|
||||||
|
nodeInfo = proxy.nodeInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
val currentTimeThread = thread {
|
||||||
|
nodeTime = proxy.currentNodeTime()
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(5000)
|
||||||
|
startNode(providedName = ALICE_NAME,
|
||||||
|
rpcUsers = listOf(User("alice", "alice", setOf("ALL"))),
|
||||||
|
customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}")))
|
||||||
|
currentTimeThread.join()
|
||||||
|
nodeInfoThread.join()
|
||||||
|
assertNotNull(nodeInfo)
|
||||||
|
assertNotNull(nodeTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `connection failover fails, rpc calls throw`() {
|
||||||
|
rpcDriver {
|
||||||
|
val ops = object : ReconnectOps {
|
||||||
|
override val protocolVersion = 0
|
||||||
|
override fun ping() = "pong"
|
||||||
|
}
|
||||||
|
|
||||||
|
val serverFollower = shutdownManager.follower()
|
||||||
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
|
serverFollower.unfollow()
|
||||||
|
// Set retry interval to 1s to reduce test duration
|
||||||
|
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
|
||||||
|
val clientFollower = shutdownManager.follower()
|
||||||
|
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||||
|
clientFollower.unfollow()
|
||||||
|
assertEquals("pong", client.ping())
|
||||||
|
serverFollower.shutdown()
|
||||||
|
try {
|
||||||
|
client.ping()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
assertTrue(e is RPCException)
|
||||||
|
}
|
||||||
|
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ThreadOps : RPCOps {
|
||||||
|
fun sendMessage(id: Int, msgNo: Int): String
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `multiple threads with 1000 messages for each thread`() {
|
||||||
|
val messageNo = 1000
|
||||||
|
val threadNo = 8
|
||||||
|
val ops = object : ThreadOps {
|
||||||
|
override val protocolVersion = 0
|
||||||
|
override fun sendMessage(id: Int, msgNo: Int): String {
|
||||||
|
return "($id-$msgNo)"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcDriver(startNodesInProcess = false) {
|
||||||
|
val serverFollower = shutdownManager.follower()
|
||||||
|
val serverPort = startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
|
||||||
|
ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
|
|
||||||
|
serverFollower.unfollow()
|
||||||
|
val proxy = RPCClient<ThreadOps>(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy
|
||||||
|
val expectedMap = mutableMapOf<Int, StringBuilder>()
|
||||||
|
val resultsMap = mutableMapOf<Int, StringBuilder>()
|
||||||
|
|
||||||
|
(1 until threadNo).forEach { nr ->
|
||||||
|
(1 until messageNo).forEach { msgNo ->
|
||||||
|
expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val threads = mutableMapOf<Int, Thread>()
|
||||||
|
(1 until threadNo).forEach { nr ->
|
||||||
|
val thread = thread {
|
||||||
|
(1 until messageNo).forEach { msgNo ->
|
||||||
|
resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
threads[nr] = thread
|
||||||
|
}
|
||||||
|
// give the threads a chance to start sending some messages
|
||||||
|
Thread.sleep(50)
|
||||||
|
serverFollower.shutdown()
|
||||||
|
startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
|
||||||
|
ops = ops, customPort = serverPort).getOrThrow()
|
||||||
|
threads.values.forEach { it.join() }
|
||||||
|
(1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
interface TrackSubscriberOps : RPCOps {
|
interface TrackSubscriberOps : RPCOps {
|
||||||
fun subscribe(): Observable<Unit>
|
fun subscribe(): Observable<Unit>
|
||||||
}
|
}
|
||||||
|
@ -171,6 +171,9 @@ class RPCClientProxyHandler(
|
|||||||
build()
|
build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Used to buffer client requests if the server is unavailable
|
||||||
|
private val outgoingRequestBuffer = ConcurrentHashMap<InvocationId, RPCApi.ClientToServer>()
|
||||||
|
|
||||||
private var sessionFactory: ClientSessionFactory? = null
|
private var sessionFactory: ClientSessionFactory? = null
|
||||||
private var producerSession: ClientSession? = null
|
private var producerSession: ClientSession? = null
|
||||||
private var consumerSession: ClientSession? = null
|
private var consumerSession: ClientSession? = null
|
||||||
@ -205,6 +208,7 @@ class RPCClientProxyHandler(
|
|||||||
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
|
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
|
||||||
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
|
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
|
||||||
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
|
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
|
||||||
|
producerSession!!.addFailoverListener(this::failoverHandler)
|
||||||
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
|
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
|
||||||
consumerSession!!.start()
|
consumerSession!!.start()
|
||||||
producerSession!!.start()
|
producerSession!!.start()
|
||||||
@ -238,8 +242,14 @@ class RPCClientProxyHandler(
|
|||||||
require(rpcReplyMap.put(replyId, replyFuture) == null) {
|
require(rpcReplyMap.put(replyId, replyFuture) == null) {
|
||||||
"Generated several RPC requests with same ID $replyId"
|
"Generated several RPC requests with same ID $replyId"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outgoingRequestBuffer[replyId] = request
|
||||||
|
// try and send the request
|
||||||
sendMessage(request)
|
sendMessage(request)
|
||||||
return replyFuture.getOrThrow()
|
val result = replyFuture.getOrThrow()
|
||||||
|
// at this point the server responded, remove the buffered request
|
||||||
|
outgoingRequestBuffer.remove(replyId)
|
||||||
|
return result
|
||||||
} catch (e: RuntimeException) {
|
} catch (e: RuntimeException) {
|
||||||
// Already an unchecked exception, so just rethrow it
|
// Already an unchecked exception, so just rethrow it
|
||||||
throw e
|
throw e
|
||||||
@ -403,6 +413,34 @@ class RPCClientProxyHandler(
|
|||||||
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
|
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun failoverHandler(event: FailoverEventType) {
|
||||||
|
when (event) {
|
||||||
|
FailoverEventType.FAILURE_DETECTED -> {
|
||||||
|
log.warn("RPC server unavailable. RPC calls are being buffered.")
|
||||||
|
}
|
||||||
|
|
||||||
|
FailoverEventType.FAILOVER_COMPLETED -> {
|
||||||
|
log.info("RPC server available. Draining request buffer.")
|
||||||
|
outgoingRequestBuffer.keys.forEach { replyId ->
|
||||||
|
outgoingRequestBuffer[replyId]?.let { sendMessage(it) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FailoverEventType.FAILOVER_FAILED -> {
|
||||||
|
log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " +
|
||||||
|
"will throw an RPCException.")
|
||||||
|
rpcReplyMap.forEach { id, replyFuture ->
|
||||||
|
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
|
||||||
|
val observable = observableContext.observableMap.getIfPresent(id)
|
||||||
|
observable?.onError(RPCException("Could not re-connect to RPC server. Failover failed."))
|
||||||
|
}
|
||||||
|
outgoingRequestBuffer.clear()
|
||||||
|
rpcReplyMap.clear()
|
||||||
|
callSiteMap?.clear()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>
|
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>
|
||||||
|
@ -19,7 +19,7 @@ import java.io.File
|
|||||||
|
|
||||||
class StandaloneShellArgsParserTest {
|
class StandaloneShellArgsParserTest {
|
||||||
|
|
||||||
private val CONFIG_FILE = File(javaClass.classLoader.getResource("config.conf")!!.file)
|
private val CONFIG_FILE = File(StandaloneShellArgsParserTest::class.java.getResource("/config.conf").toURI())
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun args_to_cmd_options() {
|
fun args_to_cmd_options() {
|
||||||
@ -43,8 +43,8 @@ class StandaloneShellArgsParserTest {
|
|||||||
"--keystore-type", "JKS")
|
"--keystore-type", "JKS")
|
||||||
|
|
||||||
val expectedOptions = CommandLineOptions(configFile = "/x/y/z/config.conf",
|
val expectedOptions = CommandLineOptions(configFile = "/x/y/z/config.conf",
|
||||||
commandsDirectory = Paths.get("/x/y/commands"),
|
commandsDirectory = Paths.get("/x/y/commands").normalize().toAbsolutePath(),
|
||||||
cordappsDirectory = Paths.get("/x/y/cordapps"),
|
cordappsDirectory = Paths.get("/x/y/cordapps").normalize().toAbsolutePath(),
|
||||||
host = "alocalhost",
|
host = "alocalhost",
|
||||||
port = "1234",
|
port = "1234",
|
||||||
user = "demo",
|
user = "demo",
|
||||||
@ -52,11 +52,11 @@ class StandaloneShellArgsParserTest {
|
|||||||
help = true,
|
help = true,
|
||||||
loggingLevel = Level.DEBUG,
|
loggingLevel = Level.DEBUG,
|
||||||
sshdPort = "2223",
|
sshdPort = "2223",
|
||||||
sshdHostKeyDirectory = Paths.get("/x/y/ssh"),
|
sshdHostKeyDirectory = Paths.get("/x/y/ssh").normalize().toAbsolutePath(),
|
||||||
keyStorePassword = "pass1",
|
keyStorePassword = "pass1",
|
||||||
trustStorePassword = "pass2",
|
trustStorePassword = "pass2",
|
||||||
keyStoreFile = Paths.get("/x/y/keystore.jks"),
|
keyStoreFile = Paths.get("/x/y/keystore.jks").normalize().toAbsolutePath(),
|
||||||
trustStoreFile = Paths.get("/x/y/truststore.jks"),
|
trustStoreFile = Paths.get("/x/y/truststore.jks").normalize().toAbsolutePath(),
|
||||||
trustStoreType = "dummy",
|
trustStoreType = "dummy",
|
||||||
keyStoreType = "JKS")
|
keyStoreType = "JKS")
|
||||||
|
|
||||||
@ -135,7 +135,7 @@ class StandaloneShellArgsParserTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun acmd_options_to_config_from_file() {
|
fun cmd_options_to_config_from_file() {
|
||||||
|
|
||||||
val options = CommandLineOptions(configFile = CONFIG_FILE.absolutePath,
|
val options = CommandLineOptions(configFile = CONFIG_FILE.absolutePath,
|
||||||
commandsDirectory = null,
|
commandsDirectory = null,
|
||||||
|
Loading…
Reference in New Issue
Block a user