OS->Ent merge - pull request #1318

OS->Ent merge
This commit is contained in:
Viktor Kolomeyko 2018-08-06 16:08:14 +01:00 committed by GitHub
commit 67a276197c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 193 additions and 115 deletions

View File

@ -176,6 +176,7 @@ allprojects {
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options" << "-parameters"
options.encoding = 'UTF-8'
}
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {

View File

@ -20,7 +20,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
@ -165,7 +165,7 @@ class RPCClientProxyHandler(
private val observablesToReap = ThreadBox(object {
var observables = ArrayList<InvocationId>()
})
private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext)
private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
@ -569,7 +569,7 @@ private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<A
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?>
/**
* Holds a context available during Kryo deserialisation of messages that are expected to contain Observables.
* Holds a context available during de-serialisation of messages that are expected to contain Observables.
*
* @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.

View File

@ -14,7 +14,6 @@ import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.amqp.amqpMagic
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
import java.util.concurrent.ConcurrentHashMap
/**
* When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation.
@ -54,7 +53,7 @@ class AMQPClientSerializationScheme(
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply {
register(RpcClientObservableSerializer)
register(RpcClientObservableDeSerializer)
register(RpcClientCordaFutureSerializer(this))
register(RxNotificationSerializer(this))
}

View File

@ -17,11 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.NotSupportedException
/**
* Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized,
* De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized,
* what is actually sent is a reference to the observable that can then be subscribed to.
*/
object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
private object RpcObservableContextKey
fun createContext(
@ -83,7 +83,7 @@ object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>
}
val observableContext =
context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext
context.properties[RpcClientObservableDeSerializer.RpcObservableContextKey] as ObservableContext
if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list")
if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}")

View File

@ -13,6 +13,7 @@ package net.corda.client.rpc
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.User
@ -23,6 +24,7 @@ import net.corda.testing.node.internal.startRpcClient
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule
import org.junit.runners.Parameterized
import java.time.Duration
open class AbstractRPCTest {
@Rule
@ -54,19 +56,20 @@ open class AbstractRPCTest {
ops: I,
rpcUser: User = rpcTestUser,
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT
): TestProxy<I> {
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
queueDrainTimeout: Duration = 5.seconds
): TestProxy<I> {
return when (mode) {
RPCTestMode.InVm ->
startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap {
startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration, queueDrainTimeout = queueDrainTimeout).flatMap {
startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) })
TestProxy(it) { startInVmArtemisSession(rpcUser.username, rpcUser.password) }
}
}
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) ->
startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
TestProxy(it) { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }
}
}
}.get()

View File

@ -0,0 +1,43 @@
package net.corda.client.rpc
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.rpcDriver
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import rx.Observable
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
@RunWith(Parameterized::class)
class RPCHighThroughputObservableTests : AbstractRPCTest() {
private fun RPCDriverDSL.testProxy(): TestOps {
return testProxy<TestOps>(TestOpsImpl(), queueDrainTimeout = 10.millis).ops
}
internal interface TestOps : RPCOps {
fun makeObservable(): Observable<Int>
}
internal class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
}
@Test
fun `simple observable`() {
rpcDriver {
val proxy = testProxy()
// This tests that the observations are transmitted correctly, also check that server side doesn't try to serialize the whole lot
// till client consumed some of the output produced.
val observations = proxy.makeObservable()
val observationsList = observations.take(4).toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observationsList)
}
}
}

View File

@ -0,0 +1,15 @@
package net.corda.core.contracts;
import org.junit.Test;
import static net.corda.finance.Currencies.POUNDS;
import static org.junit.Assert.assertEquals;
public class AmountParsingTest {
@Test
public void testGbpParse() {
assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP"));
assertEquals(POUNDS(11), Amount.parseCurrency("£11"));
}
}

View File

@ -172,4 +172,10 @@ class AmountTests {
assertEquals(originalTotals[Pair(partyA, GBP)], newTotals3[Pair(partyA, GBP)])
assertEquals(originalTotals[Pair(partyB, GBP)], newTotals3[Pair(partyB, GBP)])
}
@Test
fun testGbpParse() {
assertEquals(POUNDS(10), Amount.parseCurrency("10 GBP"))
assertEquals(POUNDS(11), Amount.parseCurrency("£11"))
}
}

View File

@ -275,9 +275,12 @@ will be freed automatically.
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
is non-deterministic.
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
Observables as parameters to the RPC methods.
Futures
-------
A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to
A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
Versioning

View File

@ -1,20 +1,37 @@
Database access when running H2
===============================
When running a node using the H2 database, the node can be configured to expose its internal database over socket which
can be browsed using any tool that can use JDBC drivers.
The JDBC URL is printed during node startup to the log and will typically look like this:
``jdbc:h2:tcp://localhost:31339/node``
.. contents::
The username and password can be altered in the :doc:`corda-configuration-file` but default to username "sa" and a blank
password.
Configuring the username and password
-------------------------------------
Any database browsing tool that supports JDBC can be used, but if you have IntelliJ Ultimate edition then there is
a tool integrated with your IDE. Just open the database window and add an H2 data source with the above details.
You will now be able to browse the tables and row data within them.
The database (a file called ``persistence.mv.db``) is created when the node first starts up. By default, it has an
administrator user ``sa`` and a blank password. The node requires the user with administrator permissions in order to
creates tables upon the first startup or after deploying new CorDapps with their own tables. The database password is
required only when the H2 database is exposed on non-localhost address (which is disabled by default).
By default, the node's H2 database is not exposed. This behaviour can be overridden by specifying the full network
address (interface and port), using the new ``h2Settings`` syntax in the node configuration.
This username and password can be changed in node configuration:
.. sourcecode:: groovy
dataSourceProperties = {
dataSource.user = [USER]
dataSource.password = [PASSWORD]
}
Note that changing the user/password for the existing node in ``node.conf`` will not update them in the H2 database.
You need to log into the database first to create a new user or change a user's password.
Connecting via a socket on a running node
-----------------------------------------
Configuring the port
^^^^^^^^^^^^^^^^^^^^
Nodes backed by an H2 database will not expose this database by default. To configure the node to expose its internal
database over a socket which can be browsed using any tool that can use JDBC drivers, you must specify the full network
address (interface and port) using the ``h2Settings`` syntax in the node configuration.
The configuration below will restrict the H2 service to run on ``localhost``:
@ -32,8 +49,8 @@ If you want H2 to auto-select a port (mimicking the old ``h2Port`` behaviour), y
address: "localhost:0"
}
If remote access is required, the address can be changed to ``0.0.0.0``.
The node requires a database password to be set when the database is exposed on the network interface to listen on.
If remote access is required, the address can be changed to ``0.0.0.0`` to listen on all interfaces. A password must be
set for the database user before doing so.
.. sourcecode:: groovy
@ -44,5 +61,44 @@ The node requires a database password to be set when the database is exposed on
dataSource.password : "strongpassword"
}
The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database
will only be accessible on localhost.
.. note:: The previous ``h2Port`` syntax is now deprecated. ``h2Port`` will continue to work but the database will only
be accessible on localhost.
Connecting to the database
^^^^^^^^^^^^^^^^^^^^^^^^^^
The JDBC URL is printed during node startup to the log and will typically look like this:
``jdbc:h2:tcp://localhost:31339/node``
Any database browsing tool that supports JDBC can be used.
Connecting using the H2 Console
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the
zip, and navigate in a terminal window to the unzipped folder
* Change directories to the bin folder: ``cd h2/bin``
* Run the following command to open the h2 web console in a web browser tab:
* Unix: ``sh h2.sh``
* Windows: ``h2.bat``
* Paste the node's JDBC URL into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no
password (unless configured otherwise)
You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an
interface for you to query them using SQL.
.. _h2_relative_path:
Connecting directly to the node's ``persistence.mv.db`` file
------------------------------------------------------------
You can also use the H2 Console to connect directly to the node's ``persistence.mv.db`` file. Ensure the node is off
before doing so, as access to the database file requires exclusive access. If the node is still running, the H2 Console
will return the following error:
``Database may be already in use: null. Possible solutions: close all other connection(s); use the server mode [90020-196]``.
``jdbc:h2:~/path/to/file/persistence``

View File

@ -3,57 +3,7 @@ Node database
Default in-memory database
--------------------------
By default, nodes store their data in an H2 database.
The database (a file persistence.mv.db) is created at the first node startup with the administrator user 'sa' and a blank password.
The user name and password can be changed in node configuration:
.. sourcecode:: groovy
dataSourceProperties = {
dataSource.user = [USER]
dataSource.password = [PASSWORD]
}
Note, changing user/password for the existing node in node.conf will not update them in the H2 database,
you need to login to the database first to create new user or change the user password.
The database password is required only when the H2 database is exposed on non-localhost address (which is disabled by default).
The node requires the user with administrator permissions in order to creates tables upon the first startup
or after deplying new CorDapps with own tables.
You can connect directly to a running node's database to see its
stored states, transactions and attachments as follows:
* Enable the H2 database access in the node configuration using the following syntax:
.. sourcecode:: groovy
h2Settings {
address: "localhost:0"
}
* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the zip, and
navigate in a terminal window to the unzipped folder
* Change directories to the bin folder: ``cd h2/bin``
* Run the following command to open the h2 web console in a web browser tab:
* Unix: ``sh h2.sh``
* Windows: ``h2.bat``
* Find the node's JDBC connection string. Each node outputs its connection string in the terminal
window as it starts up. In a terminal window where a node is running, look for the following string:
``Database connection URL is : jdbc:h2:tcp://10.18.0.150:56736/node``
* Paste this string into the JDBC URL field and click ``Connect``, using the default username (``sa``) and no password.
You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an
interface for you to query them using SQL.
The default behaviour is to expose the H2 database on localhost. This can be overridden in the
node configuration using ``h2Settings.address`` and specifying the address of the network interface to listen on,
or simply using ``0.0.0.0:0`` to listen on all interfaces. The node requires a database password to be set when
the database is exposed on the network interface to listen on.
By default, nodes store their data in an H2 database. See :doc:`node-database-access-h2`.
.. _standalone_database_config_examples_ref:

View File

@ -65,6 +65,8 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
input: DeserializationInput,
context: SerializationContext
): Observable<*> {
// Note: this type of server Serializer is never meant to read postings arriving from clients.
// I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values.
throw UnsupportedOperationException()
}

View File

@ -28,11 +28,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.days
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.core.utilities.*
import net.corda.node.internal.security.AuthorizingSubject
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
@ -257,9 +253,10 @@ class RPCServer(
}
}
fun close() {
fun close(queueDrainTimeout: Duration = 5.seconds) {
// Putting Stop message onto the queue will eventually make senderThread to stop.
sendJobQueue.put(RpcSendJob.Stop)
senderThread?.join()
senderThread?.join(queueDrainTimeout.toMillis())
reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow()

View File

@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener
import com.nhaarman.mockito_kotlin.mock
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace
import net.corda.core.internal.ThreadBox
import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
@ -90,7 +90,7 @@ class RoundTripObservableSerializerTests {
val serverSerializationContext = RpcServerObservableSerializer.createContext(
serializationContext, serverObservableContext)
val clientSerializationContext = RpcClientObservableSerializer.createContext(
val clientSerializationContext = RpcClientObservableDeSerializer.createContext(
serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id)

View File

@ -1,6 +1,6 @@
package net.corda.node.internal.serialization.testutils
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext
@ -29,7 +29,7 @@ class AMQPRoundTripRPCSerializationScheme(
) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
register(RpcClientObservableSerializer)
register(RpcClientObservableDeSerializer)
}
}
@ -45,7 +45,7 @@ class AMQPRoundTripRPCSerializationScheme(
fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) =
rpcClientSerializerFactory(
RpcClientObservableSerializer.createContext(serializationContext, observableContext)
RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
.withProperty(RPCApi.RpcRequestOrObservableIdKey, id))
fun rpcServerSerializerFactory(observableContext: TestObservableContext) =

View File

@ -151,25 +151,22 @@ class DriverTests : IntegrationTest() {
}
@Test
fun `driver rejects multiple nodes with the same name`() {
fun `driver rejects multiple nodes with the same name parallel`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
assertThatThrownBy {
listOf(
newNode(DUMMY_BANK_A_NAME)(),
newNode(DUMMY_BANK_B_NAME)(),
newNode(DUMMY_BANK_A_NAME)()
).transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
assertThatIllegalArgumentException().isThrownBy {
nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
}
}
}
@Test
fun `driver rejects multiple nodes with the same name parallel`() {
fun `driver rejects multiple nodes with the same organisation name`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
assertThatThrownBy {
nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
newNode(CordaX500Name(commonName = "Notary", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow()
assertThatIllegalArgumentException().isThrownBy {
newNode(CordaX500Name(commonName = "Regulator", organisation = "R3CEV", locality = "New York", country = "US"))().getOrThrow()
}
}
}

View File

@ -985,12 +985,15 @@ class DriverDSLImpl(
* current nodes see everyone.
*/
private class NetworkVisibilityController {
private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>())
private val nodeVisibilityHandles = ThreadBox(HashMap<String, VisibilityHandle>())
fun register(name: CordaX500Name): VisibilityHandle {
val handle = VisibilityHandle()
nodeVisibilityHandles.locked {
require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" }
require(name.organisation !in keys) {
"Node with organisation name ${name.organisation} is already started or starting"
}
put(name.organisation, handle)
}
return handle
}

View File

@ -27,6 +27,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.messaging.RPCServer
import net.corda.node.services.messaging.RPCServerConfiguration
@ -63,6 +64,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import java.lang.reflect.Method
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.util.*
import net.corda.nodeapi.internal.config.User as InternalUser
@ -110,7 +112,6 @@ val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality
// Use a global pool so that we can run RPC tests in parallel
private val globalPortAllocation = PortAllocation.Incremental(10000)
private val globalDebugPortAllocation = PortAllocation.Incremental(5005)
private val globalMonitorPortAllocation = PortAllocation.Incremental(7005)
fun <A> rpcDriver(
isDebug: Boolean = false,
@ -254,10 +255,11 @@ data class RPCDriverDSL(
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I
ops: I,
queueDrainTimeout: Duration = 5.seconds
): CordaFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker)
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout)
}
}
@ -450,7 +452,7 @@ data class RPCDriverDSL(
}
}
fun startInVmRpcBroker(
private fun startInVmRpcBroker(
rpcUser: User = rpcTestUser,
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE
@ -478,7 +480,8 @@ data class RPCDriverDSL(
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I,
brokerHandle: RpcBrokerHandle
brokerHandle: RpcBrokerHandle,
queueDrainTimeout: Duration = 5.seconds
): RpcServerHandle {
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
minLargeMessageSize = MAX_MESSAGE_SIZE
@ -495,7 +498,7 @@ data class RPCDriverDSL(
configuration
)
driverDSL.shutdownManager.registerShutdown {
rpcServer.close()
rpcServer.close(queueDrainTimeout)
locator.close()
}
rpcServer.start(brokerHandle.serverControl)
@ -530,7 +533,7 @@ class RandomRpcUser {
Generator.sequence(method.parameters.map {
generatorStore[it.type] ?: throw Exception("No generator for ${it.type}")
}).map { arguments ->
Call(method, { method.invoke(handle.proxy, *arguments.toTypedArray()) })
Call(method) { method.invoke(handle.proxy, *arguments.toTypedArray()) }
}
}
val callGenerator = Generator.choice(callGenerators)