Merge branch 'release/os/4.4' of github.com:corda/corda into new_checkpoint_schema

This commit is contained in:
stefano 2020-02-10 11:25:29 +00:00
commit 546166e057
24 changed files with 129 additions and 2519 deletions

View File

@ -5,7 +5,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
agent { label 'local-k8s' }
options { timestamps() }
environment {

View File

@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
agent { label 'local-k8s' }
options {
timestamps()
overrideIndexTriggers(false)

View File

@ -3,4 +3,4 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
onDemandTestPipeline('k8s', '.ci/dev/on-demand-tests/commentMappings.yml')
onDemandTestPipeline('local-k8s', '.ci/dev/on-demand-tests/commentMappings.yml')

View File

@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'gke' }
agent { label 'local-k8s' }
options {
timestamps()
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))

View File

@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
agent { label 'local-k8s' }
options { timestamps()
overrideIndexTriggers(false) }

View File

@ -5,7 +5,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
agent { label 'local-k8s' }
options { timestamps() }
environment {

View File

@ -193,7 +193,7 @@ plugins {
id 'com.github.johnrengelman.shadow' version '2.0.4' apply false
id "com.gradle.build-scan" version "2.2.1"
id "org.ajoberstar.grgit" version "4.0.0"
}
}
apply plugin: 'project-report'
apply plugin: 'com.github.ben-manes.versions'
@ -660,9 +660,9 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
}
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
numberOfShards 6
numberOfShards 15
streamOutput false
coresPerFork 6
coresPerFork 2
memoryInGbPerFork 10
distribute DistributeTestsBy.METHOD
nodeTaints "big"

View File

@ -7,7 +7,6 @@ import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
@ -26,7 +25,6 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.rpcDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.ClassRule
import org.junit.Test
import java.lang.Thread.sleep
import java.time.Duration
@ -41,6 +39,10 @@ class CordaRPCClientReconnectionTest {
private val portAllocator = incrementalPortAllocation()
private val gracefulReconnect = GracefulReconnect()
private val config = CordaRPCClientConfiguration.DEFAULT.copy(
connectionRetryInterval = Duration.ofSeconds(1),
connectionRetryIntervalMultiplier = 1.0
)
companion object {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
@ -61,7 +63,7 @@ class CordaRPCClientReconnectionTest {
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress)
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
@ -99,7 +101,7 @@ class CordaRPCClientReconnectionTest {
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress)
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
@ -138,7 +140,7 @@ class CordaRPCClientReconnectionTest {
val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))
val node = startNode(addresses[0])
val client = CordaRPCClient(addresses)
val client = CordaRPCClient(addresses, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
@ -175,7 +177,7 @@ class CordaRPCClientReconnectionTest {
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress)
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = GracefulReconnect(maxAttempts = 1))).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
@ -189,11 +191,11 @@ class CordaRPCClientReconnectionTest {
}
}
@Test
@Test(timeout = 60_000)
fun `establishing an RPC connection fails if there is no node listening to the specified address`() {
rpcDriver {
assertThatThrownBy {
CordaRPCClient(NetworkHostAndPort("localhost", portAllocator.nextPort()))
CordaRPCClient(NetworkHostAndPort("localhost", portAllocator.nextPort()), config)
.start(rpcUser.username, rpcUser.password, GracefulReconnect())
}.isInstanceOf(RPCException::class.java)
.hasMessage("Cannot connect to server(s). Tried with all available servers.")
@ -213,7 +215,7 @@ class CordaRPCClientReconnectionTest {
}
val node = startNode()
CordaRPCClient(node.rpcAddress).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
CordaRPCClient(node.rpcAddress, config).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
node.stop()
thread() {
it.proxy.startTrackedFlow(
@ -230,4 +232,23 @@ class CordaRPCClientReconnectionTest {
}
}
@Test
fun `RPC connection stops reconnecting after config number of retries`() {
driver(DriverParameters(cordappsForAllNodes = emptyList())) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
val conf = config.copy(maxReconnectAttempts = 2)
fun startNode(): NodeHandle = startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
val node = startNode()
val connection = CordaRPCClient(node.rpcAddress, conf).start(rpcUser.username, rpcUser.password, gracefulReconnect)
node.stop()
// After two tries we throw RPCException
assertThatThrownBy { connection.proxy.isWaitingForShutdown() }
.isInstanceOf(RPCException::class.java)
}
}
}

View File

@ -285,8 +285,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
*
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
* @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite number of retries.
* The default value is 5.
* @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite
* number of retries. The default value is 5.
*/
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
@Suppress("unused") // constructor for java

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc
import net.corda.core.CordaRuntimeException
import java.lang.reflect.Method
/**
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked method.
@ -19,8 +20,8 @@ open class UnrecoverableRPCException(message: String?, cause: Throwable? = null)
* @param maxNumberOfRetries the number of retries that had been performed.
* @param cause the cause of the last failed attempt.
*/
class MaxRpcRetryException(maxNumberOfRetries: Int, cause: Throwable?):
RPCException("Max number of retries ($maxNumberOfRetries) was reached.", cause)
class MaxRpcRetryException(maxNumberOfRetries: Int, method: Method, cause: Throwable?):
RPCException("Max number of retries ($maxNumberOfRetries) for this RPC operation (${method.name}) was reached.", cause)
/**
* Signals that the underlying [RPCConnection] dropped.

View File

@ -16,7 +16,6 @@ import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConn
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.times
import net.corda.core.internal.uncheckedCast
@ -154,7 +153,7 @@ class ReconnectingCordaRPCOps private constructor(
@Synchronized get() = when (currentState) {
// The first attempt to establish a connection will try every address only once.
UNCONNECTED ->
connect(infiniteRetries = false) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
connect(nodeHostAndPorts.size) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
CONNECTED ->
currentRPCConnection!!
CLOSED ->
@ -180,7 +179,7 @@ class ReconnectingCordaRPCOps private constructor(
//TODO - handle error cases
log.warn("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e)
connect(infiniteRetries = true)
connect(rpcConfiguration.maxReconnectAttempts)
previousConnection?.forceClose()
gracefulReconnect.onReconnect.invoke()
}
@ -192,14 +191,13 @@ class ReconnectingCordaRPCOps private constructor(
val previousConnection = currentRPCConnection
doReconnect(e, previousConnection)
}
private fun connect(infiniteRetries: Boolean): CordaRPCConnection? {
private fun connect(maxConnectAttempts: Int): CordaRPCConnection? {
currentState = CONNECTING
synchronized(this) {
currentRPCConnection = if (infiniteRetries) {
establishConnectionWithRetry(rpcConfiguration.connectionRetryInterval)
} else {
establishConnectionWithRetry(rpcConfiguration.connectionRetryInterval, retries = nodeHostAndPorts.size)
}
currentRPCConnection = establishConnectionWithRetry(
rpcConfiguration.connectionRetryInterval,
retries = maxConnectAttempts
)
// It's possible we could get closed while waiting for the connection to establish.
if (!isClosed()) {
currentState = CONNECTED
@ -257,17 +255,17 @@ class ReconnectingCordaRPCOps private constructor(
log.warn("Unknown exception [${ex.javaClass.name}] upon establishing connection.", ex)
}
}
if (retries == 0) {
throw RPCException("Cannot connect to server(s). Tried with all available servers.", ex)
}
val remainingRetries = if (retries < 0) retries else (retries - 1)
if (remainingRetries == 0) {
throw RPCException("Cannot connect to server(s). Tried with all available servers.")
}
// Could not connect this time round - pause before giving another try.
Thread.sleep(retryInterval.toMillis())
// TODO - make the exponential retry factor configurable.
val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size
val remainingRetries = if (retries < 0) retries else (retries - 1)
return establishConnectionWithRetry((retryInterval * 10) / 9, nextRoundRobinIndex, remainingRetries)
val nextInterval = retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier
return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries)
}
override val proxy: CordaRPCOps
get() = current.proxy
@ -346,7 +344,7 @@ class ReconnectingCordaRPCOps private constructor(
}
}
throw MaxRpcRetryException(maxNumberOfAttempts, lastException)
throw MaxRpcRetryException(maxNumberOfAttempts, method, lastException)
}
private fun checkIfClosed() {

View File

@ -17,7 +17,7 @@ guavaVersion=28.0-jre
quasarVersion=0.7.12_r3
quasarClassifier=jdk8
# Quasar version to use with Java 11:
quasarVersion11=0.8.0_r3_rc1
quasarVersion11=0.8.0_r3
jdkClassifier11=jdk11
proguardVersion=6.1.1
bouncycastleVersion=1.60
@ -30,7 +30,7 @@ snakeYamlVersion=1.19
caffeineVersion=2.7.0
metricsVersion=4.1.0
metricsNewRelicVersion=1.1.1
djvmVersion=1.0-RC08
djvmVersion=1.0-RC09
deterministicRtVersion=1.0-RC02
openSourceBranch=https://github.com/corda/corda/blob/release/os/4.4
openSourceSamplesBranch=https://github.com/corda/samples/blob/release-V4

View File

@ -15,6 +15,7 @@ import net.corda.core.utilities.seconds
import org.slf4j.Logger
import rx.Observable
import rx.Observer
import rx.observers.Subscribers
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.io.ByteArrayOutputStream
@ -55,6 +56,7 @@ import java.util.zip.Deflater
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.collections.LinkedHashSet
import kotlin.math.roundToLong
import kotlin.reflect.KClass
import kotlin.reflect.full.createInstance
@ -75,6 +77,7 @@ infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(th
operator fun Duration.div(divider: Long): Duration = dividedBy(divider)
operator fun Duration.times(multiplicand: Long): Duration = multipliedBy(multiplicand)
operator fun Duration.times(multiplicand: Double): Duration = Duration.ofNanos((toNanos() * multiplicand).roundToLong())
/**
* Returns the single element matching the given [predicate], or `null` if the collection is empty, or throws exception
@ -170,8 +173,8 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
@DeleteForDJVM
fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
val subject = PublishSubject.create<T>()
subject.subscribe(this)
teeTo.forEach { subject.subscribe(it) }
subject.unsafeSubscribe(Subscribers.from(this))
teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) }
return subject
}

File diff suppressed because one or more lines are too long

View File

@ -34,10 +34,13 @@ has a stable API.
Non-public API (experimental)
-----------------------------
The following modules are not part of the Corda's public API and no backwards compatibility guarantees are provided. They are further categorized in 2 classes:
The following are not part of the Corda's public API and no backwards compatibility guarantees are provided:
* the incubating modules, for which we will do our best to minimise disruption to developers using them until we are able to graduate them into the public API
* the internal modules, which are not to be used, and will change without notice
* Incubating modules, for which we will do our best to minimise disruption to developers using them until we are able to graduate them into the public API
* Internal modules, which are not to be used, and will change without notice
* Anything defined in a package containing ``.internal`` (for example, ``net.corda.core.internal`` and sub-packages should
not be used)
* Any interfaces, classes or methods whose name contains the word ``internal`` or ``Internal``
The **finance module** was the first CorDapp ever written and is a legacy module. Although it is not a part of our API guarantees, we also
don't anticipate much future change to it. Users should use the tokens SDK instead.
@ -53,8 +56,7 @@ Corda incubating modules
Corda internal modules
~~~~~~~~~~~~~~~~~~~~~~
Everything else is internal and will change without notice, even deleted, and should not be used. This also includes any package that has
``.internal`` in it. So for example, ``net.corda.core.internal`` and sub-packages should not be used.
Every other module is internal and will change without notice, even deleted, and should not be used.
Some of the public modules may depend on internal modules, so be careful to not rely on these transitive dependencies. In particular, the
testing modules depend on the node module and so you may end having the node in your test classpath.

View File

@ -362,6 +362,7 @@ A more graceful form of reconnection is also available. This will:
- reconnect any existing ``Observable``\s after a reconnection, so that they keep emitting events to the existing subscriptions.
- block any RPC calls that arrive during a reconnection or any RPC calls that were not acknowledged at the point of reconnection and will execute them after the connection is re-established.
- by default continue retrying indefinitely until the connection is re-established. See ``CordaRPCClientConfiguration.maxReconnectAttempts`` for adjusting the number of retries.
More specifically, the behaviour in the second case is a bit more subtle:
@ -377,7 +378,7 @@ You can enable this graceful form of reconnection by using the ``gracefulReconne
* ``onDisconnect``: A callback handler that will be invoked every time the connection is disconnected.
* ``onReconnect``: A callback handler that will be invoked every time the connection is established again after a disconnection.
* ``maxAttempts``: The maximum number of attempts that will be performed per RPC operation. A negative value implies infinite retries. The default value is 5.
* ``maxAttempts``: The maximum number of attempts that will be performed per *RPC operation*. A negative value implies infinite retries. The default value is 5.
This can be used in the following way:

View File

@ -36,6 +36,10 @@ This prevents configuration errors when mixing keys containing ``.`` wrapped wit
By default the node will fail to start in presence of unknown property keys.
To alter this behaviour, the ``on-unknown-config-keys`` command-line argument can be set to ``IGNORE`` (default is ``FAIL``).
.. note:: As noted in the HOCON documentation, the default behaviour for resources referenced within a config file is to silently
ignore them if missing. Therefore it is strongly recommended to utilise the ``required`` syntax for includes. See HOCON documentation
for more information.
Overriding configuration values
-------------------------------

View File

@ -135,16 +135,9 @@ dependencies {
// Manifests: for reading stuff from the manifest file
compile "com.jcabi:jcabi-manifests:$jcabi_manifests_version"
compile("com.intellij:forms_rt:7.0.3") {
exclude group: "asm"
}
// Coda Hale's Metrics: for monitoring of key statistics
compile "io.dropwizard.metrics:metrics-jmx:$metrics_version"
// JimFS: in memory java.nio filesystem. Used for test and simulation utilities.
compile "com.google.jimfs:jimfs:1.1"
// TypeSafe Config: for simple and human friendly config files.
compile "com.typesafe:config:$typesafe_config_version"

View File

@ -134,7 +134,10 @@ class RpcReconnectTests {
Unit
}
val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect)
val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds)
val config = CordaRPCClientConfiguration.DEFAULT.copy(
connectionRetryInterval = 1.seconds,
connectionRetryIntervalMultiplier = 1.0
)
val client = CordaRPCClient(addressesForRpc, configuration = config)
val bankAReconnectingRPCConnection = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect)
val bankAReconnectingRpc = bankAReconnectingRPCConnection.proxy as ReconnectingCordaRPCOps

View File

@ -2,6 +2,7 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
@ -42,6 +43,8 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
class FlowRetryTest {
val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryIntervalMultiplier = 1.1)
@Before
fun resetCounters() {
InitiatorFlow.seen.clear()
@ -69,7 +72,7 @@ class FlowRetryTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
result
@ -87,7 +90,7 @@ class FlowRetryTest {
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}
}
@ -103,7 +106,7 @@ class FlowRetryTest {
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::RetryFlow).returnValue.getOrThrow()
}
result
@ -121,7 +124,7 @@ class FlowRetryTest {
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::ThrowingFlow).returnValue.getOrThrow()
}
result
@ -136,7 +139,7 @@ class FlowRetryTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
@ -155,7 +158,7 @@ class FlowRetryTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
@ -175,7 +178,7 @@ class FlowRetryTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<CordaRuntimeException> {
it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
@ -193,7 +196,7 @@ class FlowRetryTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}.withMessageStartingWith("User not authorized to perform RPC call")

View File

@ -74,15 +74,16 @@ open class SharedNodeCmdLineOptions {
}
errors.forEach { error ->
when (error) {
is ConfigException.IO -> logger.error(configFileNotFoundMessage(configFile))
is ConfigException.IO -> logger.error(configFileNotFoundMessage(configFile, error.cause))
else -> logger.error(error.message)
}
}
}
private fun configFileNotFoundMessage(configFile: Path): String {
private fun configFileNotFoundMessage(configFile: Path, cause: Throwable?): String {
return """
Unable to load the node config file from '$configFile'.
${cause?.message?.let { "Cause: $it" } ?: ""}
Try setting the --base-directory flag to change which directory the node
is looking in, or use the --config-file flag to specify it explicitly.

View File

@ -40,7 +40,7 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
val advertisedParametersHash = try {
networkMapClient?.getNetworkMap()?.payload?.networkParameterHash
} catch (e: Exception) {
logger.info("Unable to download network map", e)
logger.warn("Unable to download network map. Node will attempt to start using network-parameters file: $e")
// If NetworkMap is down while restarting the node, we should be still able to continue with parameters from file
null
}

View File

@ -10,9 +10,13 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import rx.Observable
import rx.observers.Subscribers
import rx.subjects.PublishSubject
import java.io.Closeable
import java.lang.RuntimeException
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class ObservablesTests {
private fun isInDatabaseTransaction() = contextTransactionOrNull != null
@ -186,6 +190,30 @@ class ObservablesTests {
assertThat(source3.hasCompleted()).isTrue()
}
/**
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
* eventually shut down all of the subscribers under that PublishSubjectState.
*/
@Test
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
val source1 = PublishSubject.create<Int>()
val source2 = PublishSubject.create<Int>()
var count = 0
source1.subscribe { count += it } // safe subscriber
source1.unsafeSubscribe(Subscribers.create { throw RuntimeException() }) // this subscriber should not shut down the above subscriber
assertFailsWith<RuntimeException> {
source1.tee(source2).onNext(1)
}
assertFailsWith<RuntimeException> {
source1.tee(source2).onNext(1)
}
assertEquals(2, count)
}
@Test
fun `combine tee and bufferUntilDatabaseCommit`() {
val database = createDatabase()

View File

@ -28,6 +28,9 @@ dependencies {
compile "com.squareup.okhttp3:okhttp:$okhttp_version"
compile project(':confidential-identities')
// JimFS: in memory java.nio filesystem. Used for test and simulation utilities.
compile "com.google.jimfs:jimfs:1.1"
testCompile "org.apache.commons:commons-lang3:3.9"
}