Merge remote-tracking branch 'open/master' into bogdan-merge-14

This commit is contained in:
bpaunescu 2018-03-14 15:01:27 +00:00
commit 0b12c5f602
9 changed files with 67 additions and 134 deletions

View File

@ -3943,7 +3943,7 @@ public final class net.corda.testing.driver.NotaryHandle extends java.lang.Objec
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.NetworkHostAndPort nextHostAndPort() @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.NetworkHostAndPort nextHostAndPort()
public abstract int nextPort() public abstract int nextPort()
## ##
public static final class net.corda.testing.driver.PortAllocation$Incremental extends net.corda.testing.driver.PortAllocation @net.corda.core.DoNotImplement public static final class net.corda.testing.driver.PortAllocation$Incremental extends net.corda.testing.driver.PortAllocation
public <init>(int) public <init>(int)
@org.jetbrains.annotations.NotNull public final concurrent.atomic.AtomicInteger getPortCounter() @org.jetbrains.annotations.NotNull public final concurrent.atomic.AtomicInteger getPortCounter()
public int nextPort() public int nextPort()
@ -3968,7 +3968,7 @@ public final class net.corda.testing.driver.WebserverHandle extends java.lang.Ob
public <init>() public <init>()
public abstract int getClusterSize() public abstract int getClusterSize()
## ##
public static final class net.corda.testing.node.ClusterSpec$Raft extends net.corda.testing.node.ClusterSpec @net.corda.core.DoNotImplement public static final class net.corda.testing.node.ClusterSpec$Raft extends net.corda.testing.node.ClusterSpec
public <init>(int) public <init>(int)
public final int component1() public final int component1()
@org.jetbrains.annotations.NotNull public final net.corda.testing.node.ClusterSpec$Raft copy(int) @org.jetbrains.annotations.NotNull public final net.corda.testing.node.ClusterSpec$Raft copy(int)
@ -4032,13 +4032,13 @@ public static final class net.corda.testing.node.InMemoryMessagingNetwork$MockMe
@net.corda.core.DoNotImplement public abstract static class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy extends java.lang.Object @net.corda.core.DoNotImplement public abstract static class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy extends java.lang.Object
public abstract Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List) public abstract Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
## ##
public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$Random extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy @net.corda.core.DoNotImplement public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$Random extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
public <init>() public <init>()
public <init>(SplittableRandom) public <init>(SplittableRandom)
@org.jetbrains.annotations.NotNull public final SplittableRandom getRandom() @org.jetbrains.annotations.NotNull public final SplittableRandom getRandom()
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List) public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
## ##
public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$RoundRobin extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy @net.corda.core.DoNotImplement public static final class net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy$RoundRobin extends net.corda.testing.node.InMemoryMessagingNetwork$ServicePeerAllocationStrategy
public <init>() public <init>()
public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List) public Object pickNext(net.corda.testing.node.InMemoryMessagingNetwork$DistributedServiceHandle, List)
## ##
@ -4330,7 +4330,7 @@ public final class net.corda.client.rpc.CordaRPCClientConfiguration extends java
## ##
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
## ##
public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection @net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection
public <init>(net.corda.client.rpc.RPCConnection) public <init>(net.corda.client.rpc.RPCConnection)
public void close() public void close()
public void forceClose() public void forceClose()
@ -4387,7 +4387,7 @@ public static final class net.corda.testing.contracts.DummyContract$Companion ex
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(List, net.corda.core.identity.AbstractParty) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(List, net.corda.core.identity.AbstractParty)
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(net.corda.core.contracts.StateAndRef, net.corda.core.identity.AbstractParty) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.TransactionBuilder move(net.corda.core.contracts.StateAndRef, net.corda.core.identity.AbstractParty)
## ##
public static final class net.corda.testing.contracts.DummyContract$MultiOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State @net.corda.core.DoNotImplement public static final class net.corda.testing.contracts.DummyContract$MultiOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State
public <init>(int, List) public <init>(int, List)
public final int component1() public final int component1()
@org.jetbrains.annotations.NotNull public final List component2() @org.jetbrains.annotations.NotNull public final List component2()
@ -4399,7 +4399,7 @@ public static final class net.corda.testing.contracts.DummyContract$MultiOwnerSt
public int hashCode() public int hashCode()
public String toString() public String toString()
## ##
public static final class net.corda.testing.contracts.DummyContract$SingleOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State, net.corda.core.contracts.OwnableState @net.corda.core.DoNotImplement public static final class net.corda.testing.contracts.DummyContract$SingleOwnerState extends java.lang.Object implements net.corda.testing.contracts.DummyContract$State, net.corda.core.contracts.OwnableState
public <init>(int, net.corda.core.identity.AbstractParty) public <init>(int, net.corda.core.identity.AbstractParty)
public final int component1() public final int component1()
@org.jetbrains.annotations.NotNull public final net.corda.core.identity.AbstractParty component2() @org.jetbrains.annotations.NotNull public final net.corda.core.identity.AbstractParty component2()
@ -4475,15 +4475,15 @@ public final class net.corda.testing.core.Expect extends java.lang.Object
## ##
@net.corda.core.DoNotImplement public abstract class net.corda.testing.core.ExpectCompose extends java.lang.Object @net.corda.core.DoNotImplement public abstract class net.corda.testing.core.ExpectCompose extends java.lang.Object
## ##
public static final class net.corda.testing.core.ExpectCompose$Parallel extends net.corda.testing.core.ExpectCompose @net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Parallel extends net.corda.testing.core.ExpectCompose
public <init>(List) public <init>(List)
@org.jetbrains.annotations.NotNull public final List getParallel() @org.jetbrains.annotations.NotNull public final List getParallel()
## ##
public static final class net.corda.testing.core.ExpectCompose$Sequential extends net.corda.testing.core.ExpectCompose @net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Sequential extends net.corda.testing.core.ExpectCompose
public <init>(List) public <init>(List)
@org.jetbrains.annotations.NotNull public final List getSequence() @org.jetbrains.annotations.NotNull public final List getSequence()
## ##
public static final class net.corda.testing.core.ExpectCompose$Single extends net.corda.testing.core.ExpectCompose @net.corda.core.DoNotImplement public static final class net.corda.testing.core.ExpectCompose$Single extends net.corda.testing.core.ExpectCompose
public <init>(net.corda.testing.core.Expect) public <init>(net.corda.testing.core.Expect)
@org.jetbrains.annotations.NotNull public final net.corda.testing.core.Expect getExpect() @org.jetbrains.annotations.NotNull public final net.corda.testing.core.Expect getExpect()
## ##
@ -4610,10 +4610,10 @@ public final class net.corda.testing.dsl.DuplicateOutputLabel extends net.corda.
## ##
@net.corda.core.DoNotImplement public abstract class net.corda.testing.dsl.EnforceVerifyOrFail extends java.lang.Object @net.corda.core.DoNotImplement public abstract class net.corda.testing.dsl.EnforceVerifyOrFail extends java.lang.Object
## ##
public static final class net.corda.testing.dsl.EnforceVerifyOrFail$Token extends net.corda.testing.dsl.EnforceVerifyOrFail @net.corda.core.DoNotImplement public static final class net.corda.testing.dsl.EnforceVerifyOrFail$Token extends net.corda.testing.dsl.EnforceVerifyOrFail
public static final net.corda.testing.dsl.EnforceVerifyOrFail$Token INSTANCE public static final net.corda.testing.dsl.EnforceVerifyOrFail$Token INSTANCE
## ##
public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter @net.corda.core.DoNotImplement public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
public <init>(net.corda.testing.dsl.LedgerDSLInterpreter, net.corda.core.identity.Party) public <init>(net.corda.testing.dsl.LedgerDSLInterpreter, net.corda.core.identity.Party)
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1) @org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
public void _tweak(kotlin.jvm.functions.Function1) public void _tweak(kotlin.jvm.functions.Function1)
@ -4643,7 +4643,7 @@ public final class net.corda.testing.dsl.LedgerDSL extends java.lang.Object impl
@net.corda.core.DoNotImplement public interface net.corda.testing.dsl.OutputStateLookup @net.corda.core.DoNotImplement public interface net.corda.testing.dsl.OutputStateLookup
@org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.StateAndRef retrieveOutputStateAndRef(Class, String) @org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.StateAndRef retrieveOutputStateAndRef(Class, String)
## ##
public final class net.corda.testing.dsl.TestLedgerDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter @net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TestLedgerDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.LedgerDSLInterpreter
public <init>(net.corda.core.node.ServiceHub) public <init>(net.corda.core.node.ServiceHub)
@org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1) @org.jetbrains.annotations.NotNull public net.corda.core.transactions.WireTransaction _transaction(String, net.corda.core.transactions.TransactionBuilder, kotlin.jvm.functions.Function1)
public void _tweak(kotlin.jvm.functions.Function1) public void _tweak(kotlin.jvm.functions.Function1)
@ -4688,7 +4688,7 @@ public static final class net.corda.testing.dsl.TestLedgerDSLInterpreter$WireTra
public int hashCode() public int hashCode()
public String toString() public String toString()
## ##
public final class net.corda.testing.dsl.TestTransactionDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.testing.dsl.OutputStateLookup @net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TestTransactionDSLInterpreter extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.testing.dsl.OutputStateLookup
public <init>(net.corda.testing.dsl.TestLedgerDSLInterpreter, net.corda.core.transactions.TransactionBuilder) public <init>(net.corda.testing.dsl.TestLedgerDSLInterpreter, net.corda.core.transactions.TransactionBuilder)
public void _attachment(String) public void _attachment(String)
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1) @org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)
@ -4720,7 +4720,7 @@ public static final class net.corda.testing.dsl.TestTransactionDSLInterpreter$se
@org.jetbrains.annotations.NotNull public net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef) @org.jetbrains.annotations.NotNull public net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef)
@org.jetbrains.annotations.NotNull public Set loadStates(Set) @org.jetbrains.annotations.NotNull public Set loadStates(Set)
## ##
public final class net.corda.testing.dsl.TransactionDSL extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter @net.corda.core.DoNotImplement public final class net.corda.testing.dsl.TransactionDSL extends java.lang.Object implements net.corda.testing.dsl.TransactionDSLInterpreter
public <init>(net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.core.identity.Party) public <init>(net.corda.testing.dsl.TransactionDSLInterpreter, net.corda.core.identity.Party)
public void _attachment(String) public void _attachment(String)
@org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1) @org.jetbrains.annotations.NotNull public net.corda.testing.dsl.EnforceVerifyOrFail _tweak(kotlin.jvm.functions.Function1)

View File

@ -17,16 +17,13 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.testThreadFactory import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.User
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After import org.junit.After
@ -38,15 +35,12 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.thread
import kotlin.test.assertNotNull
class RPCStabilityTests { class RPCStabilityTests {
@Rule @Rule
@ -263,42 +257,13 @@ class RPCStabilityTests {
assertEquals("pong", client.ping()) assertEquals("pong", client.ping())
serverFollower.shutdown() serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow() startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
Thread.sleep(1000) //wait for the server to come back up
val pingFuture = pool.fork(client::ping) val pingFuture = pool.fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds)) assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang. clientFollower.shutdown() // Driver would do this after the new server, causing hang.
} }
} }
@Test
fun `client reconnects to server and resends buffered messages`() {
rpcDriver(startNodesInProcess = false) {
var nodeInfo: NodeInfo? = null
var nodeTime: Instant? = null
val alice = startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow()
CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection ->
val proxy = connection.proxy
alice.stop()
val nodeInfoThread = thread {
nodeInfo = proxy.nodeInfo()
}
val currentTimeThread = thread {
nodeTime = proxy.currentNodeTime()
}
Thread.sleep(5000)
startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL"))),
customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}")))
currentTimeThread.join()
nodeInfoThread.join()
assertNotNull(nodeInfo)
assertNotNull(nodeTime)
}
}
}
@Test @Test
fun `connection failover fails, rpc calls throw`() { fun `connection failover fails, rpc calls throw`() {
rpcDriver { rpcDriver {
@ -366,57 +331,6 @@ class RPCStabilityTests {
} }
} }
interface ThreadOps : RPCOps {
fun sendMessage(id: Int, msgNo: Int): String
}
@Test
fun `multiple threads with 1000 messages for each thread`() {
val messageNo = 1000
val threadNo = 8
val ops = object : ThreadOps {
override val protocolVersion = 0
override fun sendMessage(id: Int, msgNo: Int): String {
return "($id-$msgNo)"
}
}
rpcDriver(startNodesInProcess = false) {
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val proxy = RPCClient<ThreadOps>(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy
val expectedMap = mutableMapOf<Int, StringBuilder>()
val resultsMap = mutableMapOf<Int, StringBuilder>()
(1 until threadNo).forEach { nr ->
(1 until messageNo).forEach { msgNo ->
expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)")
}
}
val threads = mutableMapOf<Int, Thread>()
(1 until threadNo).forEach { nr ->
val thread = thread {
(1 until messageNo).forEach { msgNo ->
resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo))
}
}
threads[nr] = thread
}
// give the threads a chance to start sending some messages
Thread.sleep(50)
serverFollower.shutdown()
startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops, customPort = serverPort).getOrThrow()
threads.values.forEach { it.join() }
(1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) }
}
}
interface TrackSubscriberOps : RPCOps { interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit> fun subscribe(): Observable<Unit>
} }

View File

@ -51,6 +51,8 @@ import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.reflect.jvm.javaMethod import kotlin.reflect.jvm.javaMethod
/** /**
@ -171,9 +173,6 @@ class RPCClientProxyHandler(
build() build()
} }
// Used to buffer client requests if the server is unavailable
private val outgoingRequestBuffer = ConcurrentHashMap<InvocationId, RPCApi.ClientToServer>()
private var sessionFactory: ClientSessionFactory? = null private var sessionFactory: ClientSessionFactory? = null
private var producerSession: ClientSession? = null private var producerSession: ClientSession? = null
private var consumerSession: ClientSession? = null private var consumerSession: ClientSession? = null
@ -183,6 +182,9 @@ class RPCClientProxyHandler(
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry) private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private val deduplicationSequenceNumber = AtomicLong(0) private val deduplicationSequenceNumber = AtomicLong(0)
private val lock = ReentrantReadWriteLock()
private var sendingEnabled = true
/** /**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper. * Start the client. This creates the per-client queue, starts the consumer session and the reaper.
*/ */
@ -225,6 +227,11 @@ class RPCClientProxyHandler(
throw RPCException("RPC Proxy is closed") throw RPCException("RPC Proxy is closed")
} }
lock.readLock().withLock {
if (!sendingEnabled)
throw RPCException("RPC server is not available.")
}
val replyId = InvocationId.newInstance() val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>")) callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
try { try {
@ -243,13 +250,8 @@ class RPCClientProxyHandler(
"Generated several RPC requests with same ID $replyId" "Generated several RPC requests with same ID $replyId"
} }
outgoingRequestBuffer[replyId] = request
// try and send the request
sendMessage(request) sendMessage(request)
val result = replyFuture.getOrThrow() return replyFuture.getOrThrow()
// at this point the server responded, remove the buffered request
outgoingRequestBuffer.remove(replyId)
return result
} catch (e: RuntimeException) { } catch (e: RuntimeException) {
// Already an unchecked exception, so just rethrow it // Already an unchecked exception, so just rethrow it
throw e throw e
@ -417,8 +419,12 @@ class RPCClientProxyHandler(
private fun failoverHandler(event: FailoverEventType) { private fun failoverHandler(event: FailoverEventType) {
when (event) { when (event) {
FailoverEventType.FAILURE_DETECTED -> { FailoverEventType.FAILURE_DETECTED -> {
log.warn("RPC server unavailable. RPC calls are being buffered.") lock.writeLock().withLock {
log.warn("Terminating observables.") sendingEnabled = false
}
log.warn("RPC server unavailable.")
log.warn("Terminating observables and in flight RPCs.")
val m = observableContext.observableMap.asMap() val m = observableContext.observableMap.asMap()
m.keys.forEach { k -> m.keys.forEach { k ->
observationExecutorPool.run(k) { observationExecutorPool.run(k) {
@ -426,24 +432,24 @@ class RPCClientProxyHandler(
} }
} }
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
}
rpcReplyMap.clear()
callSiteMap?.clear()
} }
FailoverEventType.FAILOVER_COMPLETED -> { FailoverEventType.FAILOVER_COMPLETED -> {
log.info("RPC server available. Draining request buffer.") lock.writeLock().withLock {
outgoingRequestBuffer.keys.forEach { replyId -> sendingEnabled = true
outgoingRequestBuffer[replyId]?.let { sendMessage(it) }
} }
log.info("RPC server available.")
} }
FailoverEventType.FAILOVER_FAILED -> { FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " + log.error("Could not reconnect to the RPC server.")
"will throw an RPCException.")
rpcReplyMap.forEach { id, replyFuture ->
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
}
outgoingRequestBuffer.clear()
rpcReplyMap.clear()
callSiteMap?.clear()
} }
} }
} }

View File

@ -8,7 +8,7 @@
# Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. # Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
# #
gradlePluginsVersion=4.0.6 gradlePluginsVersion=4.0.8
kotlinVersion=1.2.20 kotlinVersion=1.2.20
platformVersion=4 platformVersion=4
guavaVersion=21.0 guavaVersion=21.0

View File

@ -23,8 +23,6 @@ functionality is provided in the docs for the ``proxy`` method.
For a brief tutorial on using the RPC API, see :doc:`tutorial-clientrpc-api`. For a brief tutorial on using the RPC API, see :doc:`tutorial-clientrpc-api`.
.. _rpc_security_mgmt_ref:
RPC permissions RPC permissions
--------------- ---------------
For a node's owner to interact with their node via RPC, they must define one or more RPC users. Each user is For a node's owner to interact with their node via RPC, they must define one or more RPC users. Each user is
@ -128,7 +126,7 @@ You can provide an RPC user with the permission to perform any RPC operation (in
... ...
] ]
.. _authentication_ref: .. _rpc_security_mgmt_ref:
RPC security management RPC security management
----------------------- -----------------------

View File

@ -22,9 +22,9 @@ HTTP network map protocol
------------------------- -------------------------
If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo`` If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo``
to the server (and each time it changes on startup) and then proceeds to download the entire network map. The network map to the server at that URL (and each time it changes on startup) and then proceeds to download the entire network map from
consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map (based on the HTTP cache expiry the same server. The network map consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map
header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache. (based on the HTTP cache expiry header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache.
The set of REST end-points for the network map service are as follows. The set of REST end-points for the network map service are as follows.

View File

@ -17,6 +17,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
import rx.Notification import rx.Notification
import rx.exceptions.OnErrorNotImplementedException import rx.exceptions.OnErrorNotImplementedException
import sun.security.x509.X509CertImpl import sun.security.x509.X509CertImpl
import java.security.cert.CRLReason
import java.util.* import java.util.*
/** /**
@ -72,6 +73,7 @@ object DefaultWhitelist : SerializationWhitelist {
StackTraceElement::class.java, StackTraceElement::class.java,
// Implementation of X509Certificate. // Implementation of X509Certificate.
X509CertImpl::class.java X509CertImpl::class.java,
CRLReason::class.java
) )
} }

View File

@ -17,8 +17,7 @@ import com.typesafe.config.ConfigValueFactory
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.*
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test import org.junit.Test
import java.net.URL import java.net.URL
import java.nio.file.Path import java.nio.file.Path
@ -198,6 +197,14 @@ class ConfigParsingTest {
assertThat(NullableData(null).toConfig()).isEqualTo(empty()) assertThat(NullableData(null).toConfig()).isEqualTo(empty())
} }
@Test
fun `data class with checks`() {
val config = config("positive" to -1)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { config.parseAs<PositiveData>() }
.withMessageContaining("-1")
}
@Test @Test
fun `old config property`() { fun `old config property`() {
assertThat(config("oldValue" to "old").parseAs<OldData>().newValue).isEqualTo("old") assertThat(config("oldValue" to "old").parseAs<OldData>().newValue).isEqualTo("old")
@ -301,6 +308,11 @@ class ConfigParsingTest {
data class DataListData(val list: List<StringData>) data class DataListData(val list: List<StringData>)
data class DefaultData(val a: Int, val defaultOfTwo: Int = 2) data class DefaultData(val a: Int, val defaultOfTwo: Int = 2)
data class NullableData(val nullable: String?) data class NullableData(val nullable: String?)
data class PositiveData(private val positive: Int) {
init {
require(positive > 0) { "$positive is not positive" }
}
}
data class OldData( data class OldData(
@OldConfig("oldValue") @OldConfig("oldValue")
val newValue: String) val newValue: String)

View File

@ -142,6 +142,7 @@ class InteractiveShellIntegrationTest {
} }
} }
@Ignore
@Test @Test
fun `ssh runs flows via standalone shell`() { fun `ssh runs flows via standalone shell`() {
val user = User("u", "p", setOf(Permissions.startFlow<SSHServerTest.FlowICanRun>(), val user = User("u", "p", setOf(Permissions.startFlow<SSHServerTest.FlowICanRun>(),