mirror of
https://github.com/corda/corda.git
synced 2025-01-14 16:59:52 +00:00
Merge pull request #553 from corda/bogdan-merge-14
Bogdan OS-ENT merge 14-03-18
This commit is contained in:
commit
1cf1e2d6f0
@ -3943,7 +3943,7 @@ public final class net.corda.testing.driver.NotaryHandle extends java.lang.Objec
|
|||||||
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.NetworkHostAndPort nextHostAndPort()
|
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.NetworkHostAndPort nextHostAndPort()
|
||||||
public abstract int nextPort()
|
public abstract int nextPort()
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.driver.PortAllocation$Incremental extends net.corda.testing.driver.PortAllocation
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.driver.PortAllocation$Incremental extends net.corda.testing.driver.PortAllocation
|
||||||
public <init>(int)
|
public <init>(int)
|
||||||
@org.jetbrains.annotations.NotNull public final concurrent.atomic.AtomicInteger getPortCounter()
|
@org.jetbrains.annotations.NotNull public final concurrent.atomic.AtomicInteger getPortCounter()
|
||||||
public int nextPort()
|
public int nextPort()
|
||||||
@ -3968,7 +3968,7 @@ public final class net.corda.testing.driver.WebserverHandle extends java.lang.Ob
|
|||||||
public <init>()
|
public <init>()
|
||||||
public abstract int getClusterSize()
|
public abstract int getClusterSize()
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.node.ClusterSpec$Raft extends net.corda.testing.node.ClusterSpec
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.node.ClusterSpec$Raft extends net.corda.testing.node.ClusterSpec
|
||||||
public <init>(int)
|
public <init>(int)
|
||||||
public final int component1()
|
public final int component1()
|
||||||
@org.jetbrains.annotations.NotNull public final net.corda.testing.node.ClusterSpec$Raft copy(int)
|
@org.jetbrains.annotations.NotNull public final net.corda.testing.node.ClusterSpec$Raft copy(int)
|
||||||
@ -4032,13 +4032,13 @@ public static final class net.corda.testing.node.InMemoryMessagingNetwork$MockMe
|
|||||||
@net.corda.core.DoNotImplement public abstract static class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy extends java.lang.Object
|
@net.corda.core.DoNotImplement public abstract static class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy extends java.lang.Object
|
||||||
public abstract Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
public abstract Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$Random extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$Random extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
|
||||||
public <init>()
|
public <init>()
|
||||||
public <init>(SplittableRandom)
|
public <init>(SplittableRandom)
|
||||||
@org.jetbrains.annotations.NotNull public final SplittableRandom getRandom()
|
@org.jetbrains.annotations.NotNull public final SplittableRandom getRandom()
|
||||||
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$RoundRobin extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$RoundRobin extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
|
||||||
public <init>()
|
public <init>()
|
||||||
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
|
||||||
##
|
##
|
||||||
@ -4330,7 +4330,7 @@ public final class net.corda.client.rpc.CordaRPCClientConfiguration extends java
|
|||||||
##
|
##
|
||||||
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
|
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
|
||||||
##
|
##
|
||||||
public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection
|
@net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection
|
||||||
public <init>(net.corda.client.rpc.RPCConnection)
|
public <init>(net.corda.client.rpc.RPCConnection)
|
||||||
public void close()
|
public void close()
|
||||||
public void forceClose()
|
public void forceClose()
|
||||||
@ -4387,7 +4387,7 @@ public static final class net.corda.testing.contracts.DummyContract$Companion ex
|
|||||||
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(List, net.corda.core.identity.AbstractParty)
|
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(List, net.corda.core.identity.AbstractParty)
|
||||||
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(net.corda.core.contracts.StateAndRef, net.corda.core.identity.AbstractParty)
|
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(net.corda.core.contracts.StateAndRef, net.corda.core.identity.AbstractParty)
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.contracts.DummyContract$MultiOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.contracts.DummyContract$MultiOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State
|
||||||
public <init>(int, List)
|
public <init>(int, List)
|
||||||
public final int component1()
|
public final int component1()
|
||||||
@org.jetbrains.annotations.NotNull public final List component2()
|
@org.jetbrains.annotations.NotNull public final List component2()
|
||||||
@ -4399,7 +4399,7 @@ public static final class net.corda.testing.contracts.DummyContract$MultiOwnerSt
|
|||||||
public int hashCode()
|
public int hashCode()
|
||||||
public String toString()
|
public String toString()
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.contracts.DummyContract$SingleOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State, net.corda.core.contracts.OwnableState
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.contracts.DummyContract$SingleOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State, net.corda.core.contracts.OwnableState
|
||||||
public <init>(int, net.corda.core.identity.AbstractParty)
|
public <init>(int, net.corda.core.identity.AbstractParty)
|
||||||
public final int component1()
|
public final int component1()
|
||||||
@org.jetbrains.annotations.NotNull public final net.corda.core.identity.AbstractParty component2()
|
@org.jetbrains.annotations.NotNull public final net.corda.core.identity.AbstractParty component2()
|
||||||
@ -4475,15 +4475,15 @@ public final class net.corda.testing.core.Expect extends java.lang.Object
|
|||||||
##
|
##
|
||||||
@net.corda.core.DoNotImplement public abstract class net.corda.testing.core.ExpectCompose extends java.lang.Object
|
@net.corda.core.DoNotImplement public abstract class net.corda.testing.core.ExpectCompose extends java.lang.Object
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.core.ExpectCompose$Parallel extends net.corda.testing.core.ExpectCompose
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Parallel extends net.corda.testing.core.ExpectCompose
|
||||||
public <init>(List)
|
public <init>(List)
|
||||||
@org.jetbrains.annotations.NotNull public final List getParallel()
|
@org.jetbrains.annotations.NotNull public final List getParallel()
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.core.ExpectCompose$Sequential extends net.corda.testing.core.ExpectCompose
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Sequential extends net.corda.testing.core.ExpectCompose
|
||||||
public <init>(List)
|
public <init>(List)
|
||||||
@org.jetbrains.annotations.NotNull public final List getSequence()
|
@org.jetbrains.annotations.NotNull public final List getSequence()
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.core.ExpectCompose$Single extends net.corda.testing.core.ExpectCompose
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Single extends net.corda.testing.core.ExpectCompose
|
||||||
public <init>(net.corda.testing.core.Expect)
|
public <init>(net.corda.testing.core.Expect)
|
||||||
@org.jetbrains.annotations.NotNull public final net.corda.testing.core.Expect getExpect()
|
@org.jetbrains.annotations.NotNull public final net.corda.testing.core.Expect getExpect()
|
||||||
##
|
##
|
||||||
@ -4610,10 +4610,10 @@ public final class net.corda.testing.dsl.DuplicateOutputLabel extends net.corda.
|
|||||||
##
|
##
|
||||||
@net.corda.core.DoNotImplement public abstract class net.corda.testing.dsl.EnforceVerifyOrFail extends java.lang.Object
|
@net.corda.core.DoNotImplement public abstract class net.corda.testing.dsl.EnforceVerifyOrFail extends java.lang.Object
|
||||||
##
|
##
|
||||||
public static final class net.corda.testing.dsl.EnforceVerifyOrFail$Token extends net.corda.testing.dsl.EnforceVerifyOrFail
|
@net.corda.core.DoNotImplement public static final class net.corda.testing.dsl.EnforceVerifyOrFail$Token extends net.corda.testing.dsl.EnforceVerifyOrFail
|
||||||
public static final net.corda.testing.dsl.EnforceVerifyOrFail$Token INSTANCE
|
public static final net.corda.testing.dsl.EnforceVerifyOrFail$Token INSTANCE
|
||||||
##
|
##
|
||||||
public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
|
@net.corda.core.DoNotImplement public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
|
||||||
public <init>(net.corda.testing.dsl.LedgerDSLInterpreter, net.corda.core.identity.Party)
|
public <init>(net.corda.testing.dsl.LedgerDSLInterpreter, net.corda.core.identity.Party)
|
||||||
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
|
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
|
||||||
public void _tweak(kotlin.jvm.functions.Function1)
|
public void _tweak(kotlin.jvm.functions.Function1)
|
||||||
@ -4643,7 +4643,7 @@ public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object impl
|
|||||||
@net.corda.core.DoNotImplement public interface net.corda.testing.dsl.OutputStateLookup
|
@net.corda.core.DoNotImplement public interface net.corda.testing.dsl.OutputStateLookup
|
||||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.StateAndRef retrieveOutputStateAndRef(Class, String)
|
@org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.StateAndRef retrieveOutputStateAndRef(Class, String)
|
||||||
##
|
##
|
||||||
public final class net.corda.testing.dsl.TestLedgerDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
|
@net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TestLedgerDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
|
||||||
public <init>(net.corda.core.node.ServiceHub)
|
public <init>(net.corda.core.node.ServiceHub)
|
||||||
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
|
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
|
||||||
public void _tweak(kotlin.jvm.functions.Function1)
|
public void _tweak(kotlin.jvm.functions.Function1)
|
||||||
@ -4688,7 +4688,7 @@ public static final class net.corda.testing.dsl.TestLedgerDSLInterpreter$WireTra
|
|||||||
public int hashCode()
|
public int hashCode()
|
||||||
public String toString()
|
public String toString()
|
||||||
##
|
##
|
||||||
public final class net.corda.testing.dsl.TestTransactionDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.testing.dsl.OutputStateLookup
|
@net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TestTransactionDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.testing.dsl.OutputStateLookup
|
||||||
public <init>(net.corda.testing.dsl.TestLedgerDSLInterpreter, net.corda.core.transactions.TransactionBuilder)
|
public <init>(net.corda.testing.dsl.TestLedgerDSLInterpreter, net.corda.core.transactions.TransactionBuilder)
|
||||||
public void _attachment(String)
|
public void _attachment(String)
|
||||||
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)
|
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)
|
||||||
@ -4720,7 +4720,7 @@ public static final class net.corda.testing.dsl.TestTransactionDSLInterpreter$se
|
|||||||
@org.jetbrains.annotations.NotNull public net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef)
|
@org.jetbrains.annotations.NotNull public net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef)
|
||||||
@org.jetbrains.annotations.NotNull public Set loadStates(Set)
|
@org.jetbrains.annotations.NotNull public Set loadStates(Set)
|
||||||
##
|
##
|
||||||
public final class net.corda.testing.dsl.TransactionDSL extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter
|
@net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TransactionDSL extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter
|
||||||
public <init>(net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.core.identity.Party)
|
public <init>(net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.core.identity.Party)
|
||||||
public void _attachment(String)
|
public void _attachment(String)
|
||||||
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)
|
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)
|
||||||
|
@ -17,16 +17,13 @@ import net.corda.core.crypto.random63BitValue
|
|||||||
import net.corda.core.internal.concurrent.fork
|
import net.corda.core.internal.concurrent.fork
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.node.NodeInfo
|
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.testing.core.ALICE_NAME
|
|
||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
import net.corda.testing.internal.testThreadFactory
|
import net.corda.testing.internal.testThreadFactory
|
||||||
import net.corda.testing.node.User
|
|
||||||
import net.corda.testing.node.internal.*
|
import net.corda.testing.node.internal.*
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
@ -38,15 +35,12 @@ import rx.Observable
|
|||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import rx.subjects.UnicastSubject
|
import rx.subjects.UnicastSubject
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import kotlin.concurrent.thread
|
|
||||||
import kotlin.test.assertNotNull
|
|
||||||
|
|
||||||
class RPCStabilityTests {
|
class RPCStabilityTests {
|
||||||
@Rule
|
@Rule
|
||||||
@ -263,42 +257,13 @@ class RPCStabilityTests {
|
|||||||
assertEquals("pong", client.ping())
|
assertEquals("pong", client.ping())
|
||||||
serverFollower.shutdown()
|
serverFollower.shutdown()
|
||||||
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
|
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
|
||||||
|
Thread.sleep(1000) //wait for the server to come back up
|
||||||
val pingFuture = pool.fork(client::ping)
|
val pingFuture = pool.fork(client::ping)
|
||||||
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
|
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
|
||||||
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `client reconnects to server and resends buffered messages`() {
|
|
||||||
rpcDriver(startNodesInProcess = false) {
|
|
||||||
var nodeInfo: NodeInfo? = null
|
|
||||||
var nodeTime: Instant? = null
|
|
||||||
val alice = startNode(providedName = ALICE_NAME,
|
|
||||||
rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow()
|
|
||||||
CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection ->
|
|
||||||
val proxy = connection.proxy
|
|
||||||
alice.stop()
|
|
||||||
val nodeInfoThread = thread {
|
|
||||||
nodeInfo = proxy.nodeInfo()
|
|
||||||
}
|
|
||||||
|
|
||||||
val currentTimeThread = thread {
|
|
||||||
nodeTime = proxy.currentNodeTime()
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(5000)
|
|
||||||
startNode(providedName = ALICE_NAME,
|
|
||||||
rpcUsers = listOf(User("alice", "alice", setOf("ALL"))),
|
|
||||||
customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}")))
|
|
||||||
currentTimeThread.join()
|
|
||||||
nodeInfoThread.join()
|
|
||||||
assertNotNull(nodeInfo)
|
|
||||||
assertNotNull(nodeTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `connection failover fails, rpc calls throw`() {
|
fun `connection failover fails, rpc calls throw`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
@ -366,57 +331,6 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ThreadOps : RPCOps {
|
|
||||||
fun sendMessage(id: Int, msgNo: Int): String
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `multiple threads with 1000 messages for each thread`() {
|
|
||||||
val messageNo = 1000
|
|
||||||
val threadNo = 8
|
|
||||||
val ops = object : ThreadOps {
|
|
||||||
override val protocolVersion = 0
|
|
||||||
override fun sendMessage(id: Int, msgNo: Int): String {
|
|
||||||
return "($id-$msgNo)"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcDriver(startNodesInProcess = false) {
|
|
||||||
val serverFollower = shutdownManager.follower()
|
|
||||||
val serverPort = startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
|
|
||||||
ops = ops).getOrThrow().broker.hostAndPort!!
|
|
||||||
|
|
||||||
serverFollower.unfollow()
|
|
||||||
val proxy = RPCClient<ThreadOps>(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy
|
|
||||||
val expectedMap = mutableMapOf<Int, StringBuilder>()
|
|
||||||
val resultsMap = mutableMapOf<Int, StringBuilder>()
|
|
||||||
|
|
||||||
(1 until threadNo).forEach { nr ->
|
|
||||||
(1 until messageNo).forEach { msgNo ->
|
|
||||||
expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val threads = mutableMapOf<Int, Thread>()
|
|
||||||
(1 until threadNo).forEach { nr ->
|
|
||||||
val thread = thread {
|
|
||||||
(1 until messageNo).forEach { msgNo ->
|
|
||||||
resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
threads[nr] = thread
|
|
||||||
}
|
|
||||||
// give the threads a chance to start sending some messages
|
|
||||||
Thread.sleep(50)
|
|
||||||
serverFollower.shutdown()
|
|
||||||
startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
|
|
||||||
ops = ops, customPort = serverPort).getOrThrow()
|
|
||||||
threads.values.forEach { it.join() }
|
|
||||||
(1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) }
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
interface TrackSubscriberOps : RPCOps {
|
interface TrackSubscriberOps : RPCOps {
|
||||||
fun subscribe(): Observable<Unit>
|
fun subscribe(): Observable<Unit>
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,8 @@ import java.util.*
|
|||||||
import java.util.concurrent.*
|
import java.util.concurrent.*
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||||
|
import kotlin.concurrent.withLock
|
||||||
import kotlin.reflect.jvm.javaMethod
|
import kotlin.reflect.jvm.javaMethod
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -171,9 +173,6 @@ class RPCClientProxyHandler(
|
|||||||
build()
|
build()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used to buffer client requests if the server is unavailable
|
|
||||||
private val outgoingRequestBuffer = ConcurrentHashMap<InvocationId, RPCApi.ClientToServer>()
|
|
||||||
|
|
||||||
private var sessionFactory: ClientSessionFactory? = null
|
private var sessionFactory: ClientSessionFactory? = null
|
||||||
private var producerSession: ClientSession? = null
|
private var producerSession: ClientSession? = null
|
||||||
private var consumerSession: ClientSession? = null
|
private var consumerSession: ClientSession? = null
|
||||||
@ -183,6 +182,9 @@ class RPCClientProxyHandler(
|
|||||||
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
||||||
private val deduplicationSequenceNumber = AtomicLong(0)
|
private val deduplicationSequenceNumber = AtomicLong(0)
|
||||||
|
|
||||||
|
private val lock = ReentrantReadWriteLock()
|
||||||
|
private var sendingEnabled = true
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
|
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
|
||||||
*/
|
*/
|
||||||
@ -225,6 +227,11 @@ class RPCClientProxyHandler(
|
|||||||
throw RPCException("RPC Proxy is closed")
|
throw RPCException("RPC Proxy is closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock.readLock().withLock {
|
||||||
|
if (!sendingEnabled)
|
||||||
|
throw RPCException("RPC server is not available.")
|
||||||
|
}
|
||||||
|
|
||||||
val replyId = InvocationId.newInstance()
|
val replyId = InvocationId.newInstance()
|
||||||
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
|
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
|
||||||
try {
|
try {
|
||||||
@ -243,13 +250,8 @@ class RPCClientProxyHandler(
|
|||||||
"Generated several RPC requests with same ID $replyId"
|
"Generated several RPC requests with same ID $replyId"
|
||||||
}
|
}
|
||||||
|
|
||||||
outgoingRequestBuffer[replyId] = request
|
|
||||||
// try and send the request
|
|
||||||
sendMessage(request)
|
sendMessage(request)
|
||||||
val result = replyFuture.getOrThrow()
|
return replyFuture.getOrThrow()
|
||||||
// at this point the server responded, remove the buffered request
|
|
||||||
outgoingRequestBuffer.remove(replyId)
|
|
||||||
return result
|
|
||||||
} catch (e: RuntimeException) {
|
} catch (e: RuntimeException) {
|
||||||
// Already an unchecked exception, so just rethrow it
|
// Already an unchecked exception, so just rethrow it
|
||||||
throw e
|
throw e
|
||||||
@ -417,8 +419,12 @@ class RPCClientProxyHandler(
|
|||||||
private fun failoverHandler(event: FailoverEventType) {
|
private fun failoverHandler(event: FailoverEventType) {
|
||||||
when (event) {
|
when (event) {
|
||||||
FailoverEventType.FAILURE_DETECTED -> {
|
FailoverEventType.FAILURE_DETECTED -> {
|
||||||
log.warn("RPC server unavailable. RPC calls are being buffered.")
|
lock.writeLock().withLock {
|
||||||
log.warn("Terminating observables.")
|
sendingEnabled = false
|
||||||
|
}
|
||||||
|
|
||||||
|
log.warn("RPC server unavailable.")
|
||||||
|
log.warn("Terminating observables and in flight RPCs.")
|
||||||
val m = observableContext.observableMap.asMap()
|
val m = observableContext.observableMap.asMap()
|
||||||
m.keys.forEach { k ->
|
m.keys.forEach { k ->
|
||||||
observationExecutorPool.run(k) {
|
observationExecutorPool.run(k) {
|
||||||
@ -426,24 +432,24 @@ class RPCClientProxyHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
observableContext.observableMap.invalidateAll()
|
observableContext.observableMap.invalidateAll()
|
||||||
|
|
||||||
|
rpcReplyMap.forEach { _, replyFuture ->
|
||||||
|
replyFuture.setException(RPCException("Connection failure detected."))
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcReplyMap.clear()
|
||||||
|
callSiteMap?.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
FailoverEventType.FAILOVER_COMPLETED -> {
|
FailoverEventType.FAILOVER_COMPLETED -> {
|
||||||
log.info("RPC server available. Draining request buffer.")
|
lock.writeLock().withLock {
|
||||||
outgoingRequestBuffer.keys.forEach { replyId ->
|
sendingEnabled = true
|
||||||
outgoingRequestBuffer[replyId]?.let { sendMessage(it) }
|
|
||||||
}
|
}
|
||||||
|
log.info("RPC server available.")
|
||||||
}
|
}
|
||||||
|
|
||||||
FailoverEventType.FAILOVER_FAILED -> {
|
FailoverEventType.FAILOVER_FAILED -> {
|
||||||
log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " +
|
log.error("Could not reconnect to the RPC server.")
|
||||||
"will throw an RPCException.")
|
|
||||||
rpcReplyMap.forEach { id, replyFuture ->
|
|
||||||
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
|
|
||||||
}
|
|
||||||
outgoingRequestBuffer.clear()
|
|
||||||
rpcReplyMap.clear()
|
|
||||||
callSiteMap?.clear()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
# Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
# Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||||
#
|
#
|
||||||
|
|
||||||
gradlePluginsVersion=4.0.6
|
gradlePluginsVersion=4.0.8
|
||||||
kotlinVersion=1.2.20
|
kotlinVersion=1.2.20
|
||||||
platformVersion=4
|
platformVersion=4
|
||||||
guavaVersion=21.0
|
guavaVersion=21.0
|
||||||
|
@ -23,8 +23,6 @@ functionality is provided in the docs for the ``proxy`` method.
|
|||||||
|
|
||||||
For a brief tutorial on using the RPC API, see :doc:`tutorial-clientrpc-api`.
|
For a brief tutorial on using the RPC API, see :doc:`tutorial-clientrpc-api`.
|
||||||
|
|
||||||
.. _rpc_security_mgmt_ref:
|
|
||||||
|
|
||||||
RPC permissions
|
RPC permissions
|
||||||
---------------
|
---------------
|
||||||
For a node's owner to interact with their node via RPC, they must define one or more RPC users. Each user is
|
For a node's owner to interact with their node via RPC, they must define one or more RPC users. Each user is
|
||||||
@ -128,7 +126,7 @@ You can provide an RPC user with the permission to perform any RPC operation (in
|
|||||||
...
|
...
|
||||||
]
|
]
|
||||||
|
|
||||||
.. _authentication_ref:
|
.. _rpc_security_mgmt_ref:
|
||||||
|
|
||||||
RPC security management
|
RPC security management
|
||||||
-----------------------
|
-----------------------
|
||||||
|
@ -22,9 +22,9 @@ HTTP network map protocol
|
|||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo``
|
If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo``
|
||||||
to the server (and each time it changes on startup) and then proceeds to download the entire network map. The network map
|
to the server at that URL (and each time it changes on startup) and then proceeds to download the entire network map from
|
||||||
consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map (based on the HTTP cache expiry
|
the same server. The network map consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map
|
||||||
header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache.
|
(based on the HTTP cache expiry header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache.
|
||||||
|
|
||||||
The set of REST end-points for the network map service are as follows.
|
The set of REST end-points for the network map service are as follows.
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
|||||||
import rx.Notification
|
import rx.Notification
|
||||||
import rx.exceptions.OnErrorNotImplementedException
|
import rx.exceptions.OnErrorNotImplementedException
|
||||||
import sun.security.x509.X509CertImpl
|
import sun.security.x509.X509CertImpl
|
||||||
|
import java.security.cert.CRLReason
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -72,6 +73,7 @@ object DefaultWhitelist : SerializationWhitelist {
|
|||||||
StackTraceElement::class.java,
|
StackTraceElement::class.java,
|
||||||
|
|
||||||
// Implementation of X509Certificate.
|
// Implementation of X509Certificate.
|
||||||
X509CertImpl::class.java
|
X509CertImpl::class.java,
|
||||||
|
CRLReason::class.java
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,7 @@ import com.typesafe.config.ConfigValueFactory
|
|||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.*
|
||||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.net.URL
|
import java.net.URL
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -198,6 +197,14 @@ class ConfigParsingTest {
|
|||||||
assertThat(NullableData(null).toConfig()).isEqualTo(empty())
|
assertThat(NullableData(null).toConfig()).isEqualTo(empty())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `data class with checks`() {
|
||||||
|
val config = config("positive" to -1)
|
||||||
|
assertThatExceptionOfType(IllegalArgumentException::class.java)
|
||||||
|
.isThrownBy { config.parseAs<PositiveData>() }
|
||||||
|
.withMessageContaining("-1")
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `old config property`() {
|
fun `old config property`() {
|
||||||
assertThat(config("oldValue" to "old").parseAs<OldData>().newValue).isEqualTo("old")
|
assertThat(config("oldValue" to "old").parseAs<OldData>().newValue).isEqualTo("old")
|
||||||
@ -301,6 +308,11 @@ class ConfigParsingTest {
|
|||||||
data class DataListData(val list: List<StringData>)
|
data class DataListData(val list: List<StringData>)
|
||||||
data class DefaultData(val a: Int, val defaultOfTwo: Int = 2)
|
data class DefaultData(val a: Int, val defaultOfTwo: Int = 2)
|
||||||
data class NullableData(val nullable: String?)
|
data class NullableData(val nullable: String?)
|
||||||
|
data class PositiveData(private val positive: Int) {
|
||||||
|
init {
|
||||||
|
require(positive > 0) { "$positive is not positive" }
|
||||||
|
}
|
||||||
|
}
|
||||||
data class OldData(
|
data class OldData(
|
||||||
@OldConfig("oldValue")
|
@OldConfig("oldValue")
|
||||||
val newValue: String)
|
val newValue: String)
|
||||||
|
@ -142,6 +142,7 @@ class InteractiveShellIntegrationTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
@Test
|
@Test
|
||||||
fun `ssh runs flows via standalone shell`() {
|
fun `ssh runs flows via standalone shell`() {
|
||||||
val user = User("u", "p", setOf(Permissions.startFlow<SSHServerTest.FlowICanRun>(),
|
val user = User("u", "p", setOf(Permissions.startFlow<SSHServerTest.FlowICanRun>(),
|
||||||
|
Loading…
Reference in New Issue
Block a user