OS->Ent merge

This commit is contained in:
Viktor Kolomeyko 2018-08-06 14:43:51 +01:00
commit f5989013f2
18 changed files with 193 additions and 115 deletions

View File

@ -176,6 +176,7 @@ allprojects {
tasks.withType(JavaCompile) { tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options" << "-parameters" options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options" << "-parameters"
options.encoding = 'UTF-8'
} }
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all { tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {

View File

@ -20,7 +20,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion import net.corda.client.rpc.RPCSinceVersion
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Actor import net.corda.core.context.Actor
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId import net.corda.core.context.Trace.InvocationId
@ -165,7 +165,7 @@ class RPCClientProxyHandler(
private val observablesToReap = ThreadBox(object { private val observablesToReap = ThreadBox(object {
var observables = ArrayList<InvocationId>() var observables = ArrayList<InvocationId>()
}) })
private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext) private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap { private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause -> val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
@ -569,7 +569,7 @@ private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<A
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?> private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?>
/** /**
* Holds a context available during Kryo deserialisation of messages that are expected to contain Observables. * Holds a context available during de-serialisation of messages that are expected to contain Observables.
* *
* @param observableMap holds the Observables that are ultimately exposed to the user. * @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to. * @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.

View File

@ -14,7 +14,6 @@ import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
import net.corda.serialization.internal.amqp.SerializerFactory import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.amqp.amqpMagic import net.corda.serialization.internal.amqp.amqpMagic
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
import java.util.concurrent.ConcurrentHashMap
/** /**
* When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation.
@ -54,7 +53,7 @@ class AMQPClientSerializationScheme(
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply { return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply {
register(RpcClientObservableSerializer) register(RpcClientObservableDeSerializer)
register(RpcClientCordaFutureSerializer(this)) register(RpcClientCordaFutureSerializer(this))
register(RxNotificationSerializer(this)) register(RxNotificationSerializer(this))
} }

View File

@ -17,11 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.NotSupportedException import javax.transaction.NotSupportedException
/** /**
* Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, * De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, * just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized,
* what is actually sent is a reference to the observable that can then be subscribed to. * what is actually sent is a reference to the observable that can then be subscribed to.
*/ */
object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) { object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
private object RpcObservableContextKey private object RpcObservableContextKey
fun createContext( fun createContext(
@ -83,7 +83,7 @@ object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>
} }
val observableContext = val observableContext =
context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext context.properties[RpcClientObservableDeSerializer.RpcObservableContextKey] as ObservableContext
if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list")
if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}")

View File

@ -13,6 +13,7 @@ package net.corda.client.rpc
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.User import net.corda.testing.node.User
@ -23,6 +24,7 @@ import net.corda.testing.node.internal.startRpcClient
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule import org.junit.Rule
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import java.time.Duration
open class AbstractRPCTest { open class AbstractRPCTest {
@Rule @Rule
@ -54,19 +56,20 @@ open class AbstractRPCTest {
ops: I, ops: I,
rpcUser: User = rpcTestUser, rpcUser: User = rpcTestUser,
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
queueDrainTimeout: Duration = 5.seconds
): TestProxy<I> { ): TestProxy<I> {
return when (mode) { return when (mode) {
RPCTestMode.InVm -> RPCTestMode.InVm ->
startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration, queueDrainTimeout = queueDrainTimeout).flatMap {
startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map { startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) }) TestProxy(it) { startInVmArtemisSession(rpcUser.username, rpcUser.password) }
} }
} }
RPCTestMode.Netty -> RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) -> startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) ->
startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) TestProxy(it) { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }
} }
} }
}.get() }.get()

View File

@ -0,0 +1,43 @@
package net.corda.client.rpc
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.rpcDriver
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import rx.Observable
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
@RunWith(Parameterized::class)
class RPCHighThroughputObservableTests : AbstractRPCTest() {
private fun RPCDriverDSL.testProxy(): TestOps {
return testProxy<TestOps>(TestOpsImpl(), queueDrainTimeout = 10.millis).ops
}
internal interface TestOps : RPCOps {
fun makeObservable(): Observable<Int>
}
internal class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
}
@Test
fun `simple observable`() {
rpcDriver {
val proxy = testProxy()
// This tests that the observations are transmitted correctly, also check that server side doesn't try to serialize the whole lot
// till client consumed some of the output produced.
val observations = proxy.makeObservable()
val observationsList = observations.take(4).toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observationsList)
}
}
}

View File

@ -0,0 +1,15 @@
package net.corda.core.contracts;
import org.junit.Test;
import static net.corda.finance.Currencies.POUNDS;
import static org.junit.Assert.assertEquals;
public class AmountParsingTest {
@Test
public void testGbpParse() {
assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP"));
assertEquals(POUNDS(11), Amount.parseCurrency("£11"));
}
}

View File

@ -172,4 +172,10 @@ class AmountTests {
assertEquals(originalTotals[Pair(partyA, GBP)], newTotals3[Pair(partyA, GBP)]) assertEquals(originalTotals[Pair(partyA, GBP)], newTotals3[Pair(partyA, GBP)])
assertEquals(originalTotals[Pair(partyB, GBP)], newTotals3[Pair(partyB, GBP)]) assertEquals(originalTotals[Pair(partyB, GBP)], newTotals3[Pair(partyB, GBP)])
} }
@Test
fun testGbpParse() {
assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP"))
assertEquals(POUNDS(11), Amount.parseCurrency("£11"))
}
} }

View File

@ -275,9 +275,12 @@ will be freed automatically.
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
is non-deterministic. is non-deterministic.
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
Observables as parameters to the RPC methods.
Futures Futures
------- -------
A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
Versioning Versioning

View File

@ -1,20 +1,37 @@
Database access when running H2 Database access when running H2
=============================== ===============================
When running a node using the H2 database, the node can be configured to expose its internal database over socket which
can be browsed using any tool that can use JDBC drivers.
The JDBC URL is printed during node startup to the log and will typically look like this:
``jdbc:h2:tcp://localhost:31339/node`` .. contents::
The username and password can be altered in the :doc:`corda-configuration-file` but default to username "sa" and a blank Configuring the username and password
password. -------------------------------------
Any database browsing tool that supports JDBC can be used, but if you have IntelliJ Ultimate edition then there is The database (a file called ``persistence.mv.db``) is created when the node first starts up. By default, it has an
a tool integrated with your IDE. Just open the database window and add an H2 data source with the above details. administrator user ``sa`` and a blank password. The node requires the user with administrator permissions in order to
You will now be able to browse the tables and row data within them. creates tables upon the first startup or after deploying new CorDapps with their own tables. The database password is
required only when the H2 database is exposed on non-localhost address (which is disabled by default).
By default, the node's H2 database is not exposed. This behaviour can be overridden by specifying the full network This username and password can be changed in node configuration:
address (interface and port), using the new ``h2Settings`` syntax in the node configuration.
.. sourcecode:: groovy
dataSourceProperties = {
dataSource.user = [USER]
dataSource.password = [PASSWORD]
}
Note that changing the user/password for the existing node in ``node.conf`` will not update them in the H2 database.
You need to log into the database first to create a new user or change a user's password.
Connecting via a socket on a running node
-----------------------------------------
Configuring the port
^^^^^^^^^^^^^^^^^^^^
Nodes backed by an H2 database will not expose this database by default. To configure the node to expose its internal
database over a socket which can be browsed using any tool that can use JDBC drivers, you must specify the full network
address (interface and port) using the ``h2Settings`` syntax in the node configuration.
The configuration below will restrict the H2 service to run on ``localhost``: The configuration below will restrict the H2 service to run on ``localhost``:
@ -32,8 +49,8 @@ If you want H2 to auto-select a port (mimicking the old ``h2Port`` behaviour), y
address: "localhost:0" address: "localhost:0"
} }
If remote access is required, the address can be changed to ``0.0.0.0``. If remote access is required, the address can be changed to ``0.0.0.0`` to listen on all interfaces. A password must be
The node requires a database password to be set when the database is exposed on the network interface to listen on. set for the database user before doing so.
.. sourcecode:: groovy .. sourcecode:: groovy
@ -44,5 +61,44 @@ The node requires a database password to be set when the database is exposed on
dataSource.password : "strongpassword" dataSource.password : "strongpassword"
} }
The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database .. note:: The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database will only
will only be accessible on localhost. be accessible on localhost.
Connecting to the database
^^^^^^^^^^^^^^^^^^^^^^^^^^
The JDBC URL is printed during node startup to the log and will typically look like this:
``jdbc:h2:tcp://localhost:31339/node``
Any database browsing tool that supports JDBC can be used.
Connecting using the H2 Console
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the
zip, and navigate in a terminal window to the unzipped folder
* Change directories to the bin folder: ``cd h2/bin``
* Run the following command to open the h2 web console in a web browser tab:
* Unix: ``sh h2.sh``
* Windows: ``h2.bat``
* Paste the node's JDBC URL into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no
password (unless configured otherwise)
You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an
interface for you to query them using SQL.
.. _h2_relative_path:
Connecting directly to the node's ``persistence.mv.db`` file
------------------------------------------------------------
You can also use the H2 Console to connect directly to the node's ``persistence.mv.db`` file. Ensure the node is off
before doing so, as access to the database file requires exclusive access. If the node is still running, the H2 Console
will return the following error:
``Database may be already in use: null. Possible solutions: close all other connection(s); use the server mode [90020-196]``.
``jdbc:h2:~/path/to/file/persistence``

View File

@ -3,57 +3,7 @@ Node database
Default in-memory database Default in-memory database
-------------------------- --------------------------
By default, nodes store their data in an H2 database. By default, nodes store their data in an H2 database. See :doc:`node-database-access-h2`.
The database (a file persistence.mv.db) is created at the first node startup with the administrator user 'sa' and a blank password.
The user name and password can be changed in node configuration:
.. sourcecode:: groovy
dataSourceProperties = {
dataSource.user = [USER]
dataSource.password = [PASSWORD]
}
Note, changing user/password for the existing node in node.conf will not update them in the H2 database,
you need to login to the database first to create new user or change the user password.
The database password is required only when the H2 database is exposed on non-localhost address (which is disabled by default).
The node requires the user with administrator permissions in order to creates tables upon the first startup
or after deplying new CorDapps with own tables.
You can connect directly to a running node's database to see its
stored states, transactions and attachments as follows:
* Enable the H2 database access in the node configuration using the following syntax:
.. sourcecode:: groovy
h2Settings {
address: "localhost:0"
}
* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the zip, and
navigate in a terminal window to the unzipped folder
* Change directories to the bin folder: ``cd h2/bin``
* Run the following command to open the h2 web console in a web browser tab:
* Unix: ``sh h2.sh``
* Windows: ``h2.bat``
* Find the node's JDBC connection string. Each node outputs its connection string in the terminal
window as it starts up. In a terminal window where a node is running, look for the following string:
``Database connection URL is : jdbc:h2:tcp://10.18.0.150:56736/node``
* Paste this string into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no password.
You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an
interface for you to query them using SQL.
The default behaviour is to expose the H2 database on localhost. This can be overridden in the
node configuration using ``h2Settings.address`` and specifying the address of the network interface to listen on,
or simply using ``0.0.0.0:0`` to listen on all interfaces. The node requires a database password to be set when
the database is exposed on the network interface to listen on.
.. _standalone_database_config_examples_ref: .. _standalone_database_config_examples_ref:

View File

@ -65,6 +65,8 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
input: DeserializationInput, input: DeserializationInput,
context: SerializationContext context: SerializationContext
): Observable<*> { ): Observable<*> {
// Note: this type of server Serializer is never meant to read postings arriving from clients.
// I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values.
throw UnsupportedOperationException() throw UnsupportedOperationException()
} }

View File

@ -28,11 +28,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try import net.corda.core.utilities.*
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.days
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.AuthorizingSubject
import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.serialization.amqp.RpcServerObservableSerializer import net.corda.node.serialization.amqp.RpcServerObservableSerializer
@ -257,9 +253,10 @@ class RPCServer(
} }
} }
fun close() { fun close(queueDrainTimeout: Duration = 5.seconds) {
// Putting Stop message onto the queue will eventually make senderThread to stop.
sendJobQueue.put(RpcSendJob.Stop) sendJobQueue.put(RpcSendJob.Stop)
senderThread?.join() senderThread?.join(queueDrainTimeout.toMillis())
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow() rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow() reaperExecutor?.shutdownNow()

View File

@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener import com.github.benmanes.caffeine.cache.RemovalListener
import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.mock
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
@ -90,7 +90,7 @@ class RoundTripObservableSerializerTests {
val serverSerializationContext = RpcServerObservableSerializer.createContext( val serverSerializationContext = RpcServerObservableSerializer.createContext(
serializationContext, serverObservableContext) serializationContext, serverObservableContext)
val clientSerializationContext = RpcClientObservableSerializer.createContext( val clientSerializationContext = RpcClientObservableDeSerializer.createContext(
serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id)

View File

@ -1,6 +1,6 @@
package net.corda.node.internal.serialization.testutils package net.corda.node.internal.serialization.testutils
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
@ -29,7 +29,7 @@ class AMQPRoundTripRPCSerializationScheme(
) { ) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { return SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
register(RpcClientObservableSerializer) register(RpcClientObservableDeSerializer)
} }
} }
@ -45,7 +45,7 @@ class AMQPRoundTripRPCSerializationScheme(
fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) =
rpcClientSerializerFactory( rpcClientSerializerFactory(
RpcClientObservableSerializer.createContext(serializationContext, observableContext) RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
.withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) .withProperty(RPCApi.RpcRequestOrObservableIdKey, id))
fun rpcServerSerializerFactory(observableContext: TestObservableContext) = fun rpcServerSerializerFactory(observableContext: TestObservableContext) =

View File

@ -151,25 +151,22 @@ class DriverTests : IntegrationTest() {
} }
@Test @Test
fun `driver rejects multiple nodes with the same name`() { fun `driver rejects multiple nodes with the same name parallel`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
assertThatThrownBy { val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
listOf( assertThatIllegalArgumentException().isThrownBy {
newNode(DUMMY_BANK_A_NAME)(), nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
newNode(DUMMY_BANK_B_NAME)(), }
newNode(DUMMY_BANK_A_NAME)()
).transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
} }
} }
@Test @Test
fun `driver rejects multiple nodes with the same name parallel`() { fun `driver rejects multiple nodes with the same organisation name`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) newNode(CordaX500Name(commonName = "Notary", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow()
assertThatThrownBy { assertThatIllegalArgumentException().isThrownBy {
nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() newNode(CordaX500Name(commonName = "Regulator", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java) }
} }
} }

View File

@ -985,12 +985,15 @@ class DriverDSLImpl(
* current nodes see everyone. * current nodes see everyone.
*/ */
private class NetworkVisibilityController { private class NetworkVisibilityController {
private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>()) private val nodeVisibilityHandles = ThreadBox(HashMap<String, VisibilityHandle>())
fun register(name: CordaX500Name): VisibilityHandle { fun register(name: CordaX500Name): VisibilityHandle {
val handle = VisibilityHandle() val handle = VisibilityHandle()
nodeVisibilityHandles.locked { nodeVisibilityHandles.locked {
require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" } require(name.organisation !in keys) {
"Node with organisation name ${name.organisation} is already started or starting"
}
put(name.organisation, handle)
} }
return handle return handle
} }

View File

@ -27,6 +27,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.messaging.RPCServer import net.corda.node.services.messaging.RPCServer
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
@ -63,6 +64,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import java.lang.reflect.Method import java.lang.reflect.Method
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Duration
import java.util.* import java.util.*
import net.corda.nodeapi.internal.config.User as InternalUser import net.corda.nodeapi.internal.config.User as InternalUser
@ -110,7 +112,6 @@ val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality
// Use a global pool so that we can run RPC tests in parallel // Use a global pool so that we can run RPC tests in parallel
private val globalPortAllocation = PortAllocation.Incremental(10000) private val globalPortAllocation = PortAllocation.Incremental(10000)
private val globalDebugPortAllocation = PortAllocation.Incremental(5005) private val globalDebugPortAllocation = PortAllocation.Incremental(5005)
private val globalMonitorPortAllocation = PortAllocation.Incremental(7005)
fun <A> rpcDriver( fun <A> rpcDriver(
isDebug: Boolean = false, isDebug: Boolean = false,
@ -254,10 +255,11 @@ data class RPCDriverDSL(
maxFileSize: Int = MAX_MESSAGE_SIZE, maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I ops: I,
queueDrainTimeout: Duration = 5.seconds
): CordaFuture<RpcServerHandle> { ): CordaFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout)
} }
} }
@ -450,7 +452,7 @@ data class RPCDriverDSL(
} }
} }
fun startInVmRpcBroker( private fun startInVmRpcBroker(
rpcUser: User = rpcTestUser, rpcUser: User = rpcTestUser,
maxFileSize: Int = MAX_MESSAGE_SIZE, maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE
@ -478,7 +480,8 @@ data class RPCDriverDSL(
nodeLegalName: CordaX500Name = fakeNodeLegalName, nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I, ops: I,
brokerHandle: RpcBrokerHandle brokerHandle: RpcBrokerHandle,
queueDrainTimeout: Duration = 5.seconds
): RpcServerHandle { ): RpcServerHandle {
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply { val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
minLargeMessageSize = MAX_MESSAGE_SIZE minLargeMessageSize = MAX_MESSAGE_SIZE
@ -495,7 +498,7 @@ data class RPCDriverDSL(
configuration configuration
) )
driverDSL.shutdownManager.registerShutdown { driverDSL.shutdownManager.registerShutdown {
rpcServer.close() rpcServer.close(queueDrainTimeout)
locator.close() locator.close()
} }
rpcServer.start(brokerHandle.serverControl) rpcServer.start(brokerHandle.serverControl)
@ -530,7 +533,7 @@ class RandomRpcUser {
Generator.sequence(method.parameters.map { Generator.sequence(method.parameters.map {
generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") generatorStore[it.type] ?: throw Exception("No generator for ${it.type}")
}).map { arguments -> }).map { arguments ->
Call(method, { method.invoke(handle.proxy, *arguments.toTypedArray()) }) Call(method) { method.invoke(handle.proxy, *arguments.toTypedArray()) }
} }
} }
val callGenerator = Generator.choice(callGenerators) val callGenerator = Generator.choice(callGenerators)