diff --git a/build.gradle b/build.gradle index c9e86fa819..415cda53c1 100644 --- a/build.gradle +++ b/build.gradle @@ -176,6 +176,7 @@ allprojects { tasks.withType(JavaCompile) { options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options" << "-parameters" + options.encoding = 'UTF-8' } tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 0728820cd5..5148aee673 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -20,7 +20,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId @@ -165,7 +165,7 @@ class RPCClientProxyHandler( private val observablesToReap = ThreadBox(object { var observables = ArrayList<InvocationId>() }) - private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext) + private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause -> @@ -569,7 +569,7 @@ private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<A private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?> /** - * Holds a context available during Kryo deserialisation of messages that are expected to contain Observables. + * Holds a context available during de-serialisation of messages that are expected to contain Observables. * * @param observableMap holds the Observables that are ultimately exposed to the user. * @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to. diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt index aa0e6618aa..df12645479 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt @@ -14,7 +14,6 @@ import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap import net.corda.serialization.internal.amqp.SerializerFactory import net.corda.serialization.internal.amqp.amqpMagic import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer -import java.util.concurrent.ConcurrentHashMap /** * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. @@ -54,7 +53,7 @@ class AMQPClientSerializationScheme( override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply { - register(RpcClientObservableSerializer) + register(RpcClientObservableDeSerializer) register(RpcClientCordaFutureSerializer(this)) register(RxNotificationSerializer(this)) } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt similarity index 91% rename from client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt rename to client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt index 5536e10b4c..52e9dc7cab 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt @@ -17,11 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger import javax.transaction.NotSupportedException /** - * Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, - * just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, + * De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, + * just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized, * what is actually sent is a reference to the observable that can then be subscribed to. */ -object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) { +object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) { private object RpcObservableContextKey fun createContext( @@ -83,7 +83,7 @@ object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*> } val observableContext = - context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext + context.properties[RpcClientObservableDeSerializer.RpcObservableContextKey] as ObservableContext if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index c2c8fba105..4595296fac 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -13,6 +13,7 @@ package net.corda.client.rpc import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps +import net.corda.core.utilities.seconds import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.node.User @@ -23,6 +24,7 @@ import net.corda.testing.node.internal.startRpcClient import org.apache.activemq.artemis.api.core.client.ClientSession import org.junit.Rule import org.junit.runners.Parameterized +import java.time.Duration open class AbstractRPCTest { @Rule @@ -54,19 +56,20 @@ open class AbstractRPCTest { ops: I, rpcUser: User = rpcTestUser, clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, - serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT - ): TestProxy<I> { + serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, + queueDrainTimeout: Duration = 5.seconds + ): TestProxy<I> { return when (mode) { RPCTestMode.InVm -> - startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { + startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration, queueDrainTimeout = queueDrainTimeout).flatMap { startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) }) + TestProxy(it) { startInVmArtemisSession(rpcUser.username, rpcUser.password) } } } RPCTestMode.Netty -> startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) -> startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) + TestProxy(it) { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) } } } }.get() diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt new file mode 100644 index 0000000000..72013ca955 --- /dev/null +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt @@ -0,0 +1,43 @@ +package net.corda.client.rpc + +import net.corda.core.messaging.RPCOps +import net.corda.core.utilities.millis +import net.corda.testing.node.internal.RPCDriverDSL +import net.corda.testing.node.internal.rpcDriver +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import rx.Observable +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals + +@RunWith(Parameterized::class) +class RPCHighThroughputObservableTests : AbstractRPCTest() { + + private fun RPCDriverDSL.testProxy(): TestOps { + return testProxy<TestOps>(TestOpsImpl(), queueDrainTimeout = 10.millis).ops + } + + internal interface TestOps : RPCOps { + + fun makeObservable(): Observable<Int> + } + + internal class TestOpsImpl : TestOps { + override val protocolVersion = 1 + + override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 } + } + + @Test + fun `simple observable`() { + rpcDriver { + val proxy = testProxy() + // This tests that the observations are transmitted correctly, also check that server side doesn't try to serialize the whole lot + // till client consumed some of the output produced. + val observations = proxy.makeObservable() + val observationsList = observations.take(4).toBlocking().toIterable().toList() + assertEquals(listOf(1, 2, 3, 4), observationsList) + } + } +} diff --git a/core/src/test/java/net/corda/core/contracts/AmountParsingTest.java b/core/src/test/java/net/corda/core/contracts/AmountParsingTest.java new file mode 100644 index 0000000000..1b23374ce2 --- /dev/null +++ b/core/src/test/java/net/corda/core/contracts/AmountParsingTest.java @@ -0,0 +1,15 @@ +package net.corda.core.contracts; + +import org.junit.Test; + +import static net.corda.finance.Currencies.POUNDS; +import static org.junit.Assert.assertEquals; + +public class AmountParsingTest { + + @Test + public void testGbpParse() { + assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP")); + assertEquals(POUNDS(11), Amount.parseCurrency("£11")); + } +} diff --git a/core/src/test/kotlin/net/corda/core/contracts/AmountTests.kt b/core/src/test/kotlin/net/corda/core/contracts/AmountTests.kt index f09f7de8fd..fbb066d5ed 100644 --- a/core/src/test/kotlin/net/corda/core/contracts/AmountTests.kt +++ b/core/src/test/kotlin/net/corda/core/contracts/AmountTests.kt @@ -172,4 +172,10 @@ class AmountTests { assertEquals(originalTotals[Pair(partyA, GBP)], newTotals3[Pair(partyA, GBP)]) assertEquals(originalTotals[Pair(partyB, GBP)], newTotals3[Pair(partyB, GBP)]) } + + @Test + fun testGbpParse() { + assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP")) + assertEquals(POUNDS(11), Amount.parseCurrency("£11")) + } } \ No newline at end of file diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index f09e7846ac..9d1128ec83 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -275,9 +275,12 @@ will be freed automatically. printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection is non-deterministic. +.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass +Observables as parameters to the RPC methods. + Futures ------- -A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to +A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. Versioning diff --git a/docs/source/node-database-access-h2.rst b/docs/source/node-database-access-h2.rst index e68ae41aeb..1bdc0c8d26 100644 --- a/docs/source/node-database-access-h2.rst +++ b/docs/source/node-database-access-h2.rst @@ -1,20 +1,37 @@ Database access when running H2 =============================== -When running a node using the H2 database, the node can be configured to expose its internal database over socket which -can be browsed using any tool that can use JDBC drivers. -The JDBC URL is printed during node startup to the log and will typically look like this: - ``jdbc:h2:tcp://localhost:31339/node`` +.. contents:: -The username and password can be altered in the :doc:`corda-configuration-file` but default to username "sa" and a blank -password. +Configuring the username and password +------------------------------------- -Any database browsing tool that supports JDBC can be used, but if you have IntelliJ Ultimate edition then there is -a tool integrated with your IDE. Just open the database window and add an H2 data source with the above details. -You will now be able to browse the tables and row data within them. +The database (a file called ``persistence.mv.db``) is created when the node first starts up. By default, it has an +administrator user ``sa`` and a blank password. The node requires the user with administrator permissions in order to +creates tables upon the first startup or after deploying new CorDapps with their own tables. The database password is +required only when the H2 database is exposed on non-localhost address (which is disabled by default). -By default, the node's H2 database is not exposed. This behaviour can be overridden by specifying the full network -address (interface and port), using the new ``h2Settings`` syntax in the node configuration. +This username and password can be changed in node configuration: + + .. sourcecode:: groovy + + dataSourceProperties = { + dataSource.user = [USER] + dataSource.password = [PASSWORD] + } + +Note that changing the user/password for the existing node in ``node.conf`` will not update them in the H2 database. +You need to log into the database first to create a new user or change a user's password. + +Connecting via a socket on a running node +----------------------------------------- + +Configuring the port +^^^^^^^^^^^^^^^^^^^^ + +Nodes backed by an H2 database will not expose this database by default. To configure the node to expose its internal +database over a socket which can be browsed using any tool that can use JDBC drivers, you must specify the full network +address (interface and port) using the ``h2Settings`` syntax in the node configuration. The configuration below will restrict the H2 service to run on ``localhost``: @@ -32,8 +49,8 @@ If you want H2 to auto-select a port (mimicking the old ``h2Port`` behaviour), y address: "localhost:0" } -If remote access is required, the address can be changed to ``0.0.0.0``. -The node requires a database password to be set when the database is exposed on the network interface to listen on. +If remote access is required, the address can be changed to ``0.0.0.0`` to listen on all interfaces. A password must be +set for the database user before doing so. .. sourcecode:: groovy @@ -44,5 +61,44 @@ The node requires a database password to be set when the database is exposed on dataSource.password : "strongpassword" } -The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database -will only be accessible on localhost. +.. note:: The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database will only + be accessible on localhost. + +Connecting to the database +^^^^^^^^^^^^^^^^^^^^^^^^^^ +The JDBC URL is printed during node startup to the log and will typically look like this: + + ``jdbc:h2:tcp://localhost:31339/node`` + +Any database browsing tool that supports JDBC can be used. + +Connecting using the H2 Console +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the + zip, and navigate in a terminal window to the unzipped folder + +* Change directories to the bin folder: ``cd h2/bin`` + +* Run the following command to open the h2 web console in a web browser tab: + + * Unix: ``sh h2.sh`` + * Windows: ``h2.bat`` + +* Paste the node's JDBC URL into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no + password (unless configured otherwise) + +You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an +interface for you to query them using SQL. + +.. _h2_relative_path: + +Connecting directly to the node's ``persistence.mv.db`` file +------------------------------------------------------------ + +You can also use the H2 Console to connect directly to the node's ``persistence.mv.db`` file. Ensure the node is off +before doing so, as access to the database file requires exclusive access. If the node is still running, the H2 Console +will return the following error: +``Database may be already in use: null. Possible solutions: close all other connection(s); use the server mode [90020-196]``. + + ``jdbc:h2:~/path/to/file/persistence`` diff --git a/docs/source/node-database.rst b/docs/source/node-database.rst index edc52e7922..a5240cd311 100644 --- a/docs/source/node-database.rst +++ b/docs/source/node-database.rst @@ -3,57 +3,7 @@ Node database Default in-memory database -------------------------- -By default, nodes store their data in an H2 database. -The database (a file persistence.mv.db) is created at the first node startup with the administrator user 'sa' and a blank password. -The user name and password can be changed in node configuration: - -.. sourcecode:: groovy - - dataSourceProperties = { - dataSource.user = [USER] - dataSource.password = [PASSWORD] - } - -Note, changing user/password for the existing node in node.conf will not update them in the H2 database, -you need to login to the database first to create new user or change the user password. -The database password is required only when the H2 database is exposed on non-localhost address (which is disabled by default). -The node requires the user with administrator permissions in order to creates tables upon the first startup -or after deplying new CorDapps with own tables. - -You can connect directly to a running node's database to see its -stored states, transactions and attachments as follows: - -* Enable the H2 database access in the node configuration using the following syntax: - - .. sourcecode:: groovy - - h2Settings { - address: "localhost:0" - } - -* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the zip, and - navigate in a terminal window to the unzipped folder -* Change directories to the bin folder: ``cd h2/bin`` - -* Run the following command to open the h2 web console in a web browser tab: - - * Unix: ``sh h2.sh`` - * Windows: ``h2.bat`` - -* Find the node's JDBC connection string. Each node outputs its connection string in the terminal - window as it starts up. In a terminal window where a node is running, look for the following string: - - ``Database connection URL is : jdbc:h2:tcp://10.18.0.150:56736/node`` - -* Paste this string into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no password. - -You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an -interface for you to query them using SQL. - -The default behaviour is to expose the H2 database on localhost. This can be overridden in the -node configuration using ``h2Settings.address`` and specifying the address of the network interface to listen on, -or simply using ``0.0.0.0:0`` to listen on all interfaces. The node requires a database password to be set when -the database is exposed on the network interface to listen on. +By default, nodes store their data in an H2 database. See :doc:`node-database-access-h2`. .. _standalone_database_config_examples_ref: diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt index bb952e9788..7cdd638152 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt @@ -65,6 +65,8 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>> input: DeserializationInput, context: SerializationContext ): Observable<*> { + // Note: this type of server Serializer is never meant to read postings arriving from clients. + // I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values. throw UnsupportedOperationException() } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index f6ff74e96d..13b1749da1 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -28,11 +28,7 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.deserialize -import net.corda.core.utilities.Try -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.days -import net.corda.core.utilities.debug -import net.corda.core.utilities.seconds +import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.serialization.amqp.RpcServerObservableSerializer @@ -257,9 +253,10 @@ class RPCServer( } } - fun close() { + fun close(queueDrainTimeout: Duration = 5.seconds) { + // Putting Stop message onto the queue will eventually make senderThread to stop. sendJobQueue.put(RpcSendJob.Stop) - senderThread?.join() + senderThread?.join(queueDrainTimeout.toMillis()) reaperScheduledFuture?.cancel(false) rpcExecutor?.shutdownNow() reaperExecutor?.shutdownNow() diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt index 7289a9a959..9a0a8d042c 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalListener import com.nhaarman.mockito_kotlin.mock -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Trace import net.corda.core.internal.ThreadBox import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme @@ -90,7 +90,7 @@ class RoundTripObservableSerializerTests { val serverSerializationContext = RpcServerObservableSerializer.createContext( serializationContext, serverObservableContext) - val clientSerializationContext = RpcClientObservableSerializer.createContext( + val clientSerializationContext = RpcClientObservableDeSerializer.createContext( serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt index c397920197..d17755fa4c 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -1,6 +1,6 @@ package net.corda.node.internal.serialization.testutils -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Trace import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext @@ -29,7 +29,7 @@ class AMQPRoundTripRPCSerializationScheme( ) { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { - register(RpcClientObservableSerializer) + register(RpcClientObservableDeSerializer) } } @@ -45,7 +45,7 @@ class AMQPRoundTripRPCSerializationScheme( fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = rpcClientSerializerFactory( - RpcClientObservableSerializer.createContext(serializationContext, observableContext) + RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) .withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) fun rpcServerSerializerFactory(observableContext: TestObservableContext) = diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index 5d257d020a..5619b0a86b 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -151,25 +151,22 @@ class DriverTests : IntegrationTest() { } @Test - fun `driver rejects multiple nodes with the same name`() { + fun `driver rejects multiple nodes with the same name parallel`() { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - assertThatThrownBy { - listOf( - newNode(DUMMY_BANK_A_NAME)(), - newNode(DUMMY_BANK_B_NAME)(), - newNode(DUMMY_BANK_A_NAME)() - ).transpose().getOrThrow() - }.isInstanceOf(IllegalArgumentException::class.java) + val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) + assertThatIllegalArgumentException().isThrownBy { + nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() + } } } @Test - fun `driver rejects multiple nodes with the same name parallel`() { + fun `driver rejects multiple nodes with the same organisation name`() { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) - assertThatThrownBy { - nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() - }.isInstanceOf(IllegalArgumentException::class.java) + newNode(CordaX500Name(commonName = "Notary", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow() + assertThatIllegalArgumentException().isThrownBy { + newNode(CordaX500Name(commonName = "Regulator", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow() + } } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 60a8505314..5ef10881b4 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -985,12 +985,15 @@ class DriverDSLImpl( * current nodes see everyone. */ private class NetworkVisibilityController { - private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>()) + private val nodeVisibilityHandles = ThreadBox(HashMap<String, VisibilityHandle>()) fun register(name: CordaX500Name): VisibilityHandle { val handle = VisibilityHandle() nodeVisibilityHandles.locked { - require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" } + require(name.organisation !in keys) { + "Node with organisation name ${name.organisation} is already started or starting" + } + put(name.organisation, handle) } return handle } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index e3100b2840..e1b5e200c8 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -27,6 +27,7 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.RPCOps import net.corda.core.node.NetworkParameters import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.services.messaging.RPCServer import net.corda.node.services.messaging.RPCServerConfiguration @@ -63,6 +64,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import java.lang.reflect.Method import java.nio.file.Path import java.nio.file.Paths +import java.time.Duration import java.util.* import net.corda.nodeapi.internal.config.User as InternalUser @@ -110,7 +112,6 @@ val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality // Use a global pool so that we can run RPC tests in parallel private val globalPortAllocation = PortAllocation.Incremental(10000) private val globalDebugPortAllocation = PortAllocation.Incremental(5005) -private val globalMonitorPortAllocation = PortAllocation.Incremental(7005) fun <A> rpcDriver( isDebug: Boolean = false, @@ -254,10 +255,11 @@ data class RPCDriverDSL( maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, - ops: I + ops: I, + queueDrainTimeout: Duration = 5.seconds ): CordaFuture<RpcServerHandle> { return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> - startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) + startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout) } } @@ -450,7 +452,7 @@ data class RPCDriverDSL( } } - fun startInVmRpcBroker( + private fun startInVmRpcBroker( rpcUser: User = rpcTestUser, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE @@ -478,7 +480,8 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, - brokerHandle: RpcBrokerHandle + brokerHandle: RpcBrokerHandle, + queueDrainTimeout: Duration = 5.seconds ): RpcServerHandle { val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply { minLargeMessageSize = MAX_MESSAGE_SIZE @@ -495,7 +498,7 @@ data class RPCDriverDSL( configuration ) driverDSL.shutdownManager.registerShutdown { - rpcServer.close() + rpcServer.close(queueDrainTimeout) locator.close() } rpcServer.start(brokerHandle.serverControl) @@ -530,7 +533,7 @@ class RandomRpcUser { Generator.sequence(method.parameters.map { generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") }).map { arguments -> - Call(method, { method.invoke(handle.proxy, *arguments.toTypedArray()) }) + Call(method) { method.invoke(handle.proxy, *arguments.toTypedArray()) } } } val callGenerator = Generator.choice(callGenerators)