mirror of
https://github.com/corda/corda.git
synced 2025-03-15 00:36:49 +00:00
Merge pull request #1309 from corda/os-merge-93bb24e
O/S merge from 93bb24e
This commit is contained in:
commit
52c1ed274d
8
.idea/compiler.xml
generated
8
.idea/compiler.xml
generated
@ -45,6 +45,10 @@
|
||||
<module name="capsule-hsm_test" target="1.8" />
|
||||
<module name="client_main" target="1.8" />
|
||||
<module name="client_test" target="1.8" />
|
||||
<module name="com.r3.corda_buildSrc_main" target="1.8" />
|
||||
<module name="com.r3.corda_buildSrc_test" target="1.8" />
|
||||
<module name="com.r3.corda_canonicalizer_main" target="1.8" />
|
||||
<module name="com.r3.corda_canonicalizer_test" target="1.8" />
|
||||
<module name="common_main" target="1.8" />
|
||||
<module name="common_test" target="1.8" />
|
||||
<module name="confidential-identities_main" target="1.8" />
|
||||
@ -185,6 +189,10 @@
|
||||
<module name="loadtest_test" target="1.8" />
|
||||
<module name="mock_main" target="1.8" />
|
||||
<module name="mock_test" target="1.8" />
|
||||
<module name="net.corda_buildSrc_main" target="1.8" />
|
||||
<module name="net.corda_buildSrc_test" target="1.8" />
|
||||
<module name="net.corda_canonicalizer_main" target="1.8" />
|
||||
<module name="net.corda_canonicalizer_test" target="1.8" />
|
||||
<module name="network-bootstrapper_main" target="1.8" />
|
||||
<module name="network-bootstrapper_test" target="1.8" />
|
||||
<module name="network-verifier_main" target="1.8" />
|
||||
|
@ -85,7 +85,7 @@ buildscript {
|
||||
ext.snappy_version = '0.4'
|
||||
ext.fast_classpath_scanner_version = '2.12.3'
|
||||
ext.jcabi_manifests_version = '1.1'
|
||||
ext.picocli_version = '3.0.0'
|
||||
ext.picocli_version = '3.3.0'
|
||||
|
||||
// Name of the IntelliJ SDK created for the deterministic Java rt.jar.
|
||||
// ext.deterministic_idea_sdk = '1.8 (Deterministic)'
|
||||
|
@ -24,7 +24,7 @@
|
||||
<Appenders>
|
||||
<Console name="Console-Appender" target="SYSTEM_OUT">
|
||||
<PatternLayout>
|
||||
<ScriptPatternSelector defaultPattern="%highlight{[%level{length=5}] %date{HH:mm:ssZ} [%t] %c{2}.%method - %msg%n}{INFO=white,WARN=red,FATAL=bright red}">
|
||||
<ScriptPatternSelector defaultPattern="%highlight{[%level{length=5}] %date{HH:mm:ssZ} [%t] %c{2}.%method - %msg%n %throwable{0}}{INFO=white,WARN=red,FATAL=bright red}">
|
||||
<Script name="MDCSelector" language="javascript"><![CDATA[
|
||||
result = null;
|
||||
if (!logEvent.getContextData().size() == 0) {
|
||||
@ -43,7 +43,7 @@
|
||||
|
||||
<!-- Required for printBasicInfo -->
|
||||
<Console name="Console-Appender-Println" target="SYSTEM_OUT">
|
||||
<PatternLayout pattern="%msg%n" />
|
||||
<PatternLayout pattern="%msg%n %throwable{0}" />
|
||||
</Console>
|
||||
|
||||
<!-- Will generate up to 100 log files for a given day. During every rollover it will delete
|
||||
|
@ -48,14 +48,14 @@ class NoAnswer(private val closure: () -> Unit = {}) : FlowLogic<Unit>() {
|
||||
* Allows to register a flow of type [R] against an initiating flow of type [I].
|
||||
*/
|
||||
inline fun <I : FlowLogic<*>, reified R : FlowLogic<*>> TestStartedNode.registerInitiatedFlow(initiatingFlowType: KClass<I>, crossinline construct: (session: FlowSession) -> R) {
|
||||
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
|
||||
registerFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to register a flow of type [Answer] against an initiating flow of type [I], returning a valure of type [R].
|
||||
*/
|
||||
inline fun <I : FlowLogic<*>, reified R : Any> TestStartedNode.registerAnswer(initiatingFlowType: KClass<I>, value: R) {
|
||||
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
|
||||
registerFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -161,7 +161,7 @@ class AttachmentSerializationTest {
|
||||
}
|
||||
|
||||
private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) {
|
||||
server.internalRegisterFlowFactory(
|
||||
server.registerFlowFactory(
|
||||
ClientLogic::class.java,
|
||||
InitiatedFlowFactory.Core { ServerLogic(it, sendData) },
|
||||
ServerLogic::class.java,
|
||||
|
185
docs/source/network-builder.rst
Normal file
185
docs/source/network-builder.rst
Normal file
@ -0,0 +1,185 @@
|
||||
Corda Network Builder
|
||||
=====================
|
||||
|
||||
.. contents::
|
||||
|
||||
The Corda Network Builder is a tool for building Corda networks for testing purposes. It leverages Docker and
|
||||
containers to abstract the complexity of managing a distributed network away from the user.
|
||||
|
||||
Currently, the network you build will either be made up of local ``docker`` nodes *or* of nodes spread across Azure
|
||||
containers.
|
||||
|
||||
The Corda Network Builder can be downloaded from
|
||||
https://ci-artifactory.corda.r3cev.com/artifactory/corda-releases/net/corda/corda-network-builder/X.Y-corda/corda-network-builder-X.Y-corda-executable.jar,
|
||||
where ``X`` is the major Corda version and ``Y`` is the minor Corda version.
|
||||
|
||||
.. _pre-requisites:
|
||||
|
||||
Prerequisites
|
||||
-------------
|
||||
|
||||
* **Docker:** docker > 17.12.0-ce
|
||||
* **Azure:** authenticated az-cli >= 2.0 (see: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest)
|
||||
|
||||
.. _creating_the_base_nodes:
|
||||
|
||||
Creating the base nodes
|
||||
-----------------------
|
||||
|
||||
The network builder uses a set of nodes as the base for all other operations. A node is anything that satisfies
|
||||
the following layout:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
-
|
||||
-- node.conf
|
||||
-- corda.jar
|
||||
-- cordapps/
|
||||
|
||||
|
||||
An easy way to build a valid set of nodes is by running ``deployNodes``. In this document, we will be using
|
||||
the output of running ``deployNodes`` for the `Example CorDapp <https://github.com/corda/cordapp-example>`_:
|
||||
|
||||
1. ``git clone https://github.com/corda/cordapp-example``
|
||||
2. ``cd cordapp-example``
|
||||
3. ``./gradlew clean deployNodes``
|
||||
|
||||
Building a network via the command line
|
||||
---------------------------------------
|
||||
|
||||
Starting the nodes
|
||||
^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Quickstart Local Docker
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
1. ``cd kotlin-source/build/nodes``
|
||||
2. ``java -jar <path/to/network-builder-jar> -d .``
|
||||
|
||||
If you run ``docker ps`` to see the running containers, the following output should be displayed:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
|
||||
406868b4ba69 node-partyc:corda-network "/run-corda.sh" 17 seconds ago Up 16 seconds 0.0.0.0:32902->10003/tcp, 0.0.0.0:32895->10005/tcp, 0.0.0.0:32898->10020/tcp, 0.0.0.0:32900->12222/tcp partyc0
|
||||
4546a2fa8de7 node-partyb:corda-network "/run-corda.sh" 17 seconds ago Up 17 seconds 0.0.0.0:32896->10003/tcp, 0.0.0.0:32899->10005/tcp, 0.0.0.0:32901->10020/tcp, 0.0.0.0:32903->12222/tcp partyb0
|
||||
c8c44c515bdb node-partya:corda-network "/run-corda.sh" 17 seconds ago Up 17 seconds 0.0.0.0:32894->10003/tcp, 0.0.0.0:32897->10005/tcp, 0.0.0.0:32892->10020/tcp, 0.0.0.0:32893->12222/tcp partya0
|
||||
cf7ab689f493 node-notary:corda-network "/run-corda.sh" 30 seconds ago Up 31 seconds 0.0.0.0:32888->10003/tcp, 0.0.0.0:32889->10005/tcp, 0.0.0.0:32890->10020/tcp, 0.0.0.0:32891->12222/tcp notary0
|
||||
|
||||
Quickstart Remote Azure
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
1. ``cd kotlin-source/build/nodes``
|
||||
2. ``java -jar <path/to/network-builder-jar> -b AZURE -d .``
|
||||
|
||||
.. note:: The Azure configuration is handled by the az-cli utility. See the :ref:`pre-requisites`.
|
||||
|
||||
.. _interacting_with_the_nodes:
|
||||
|
||||
Interacting with the nodes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You can interact with the nodes by SSHing into them on the port that is mapped to 12222. For example, to SSH into the
|
||||
``partya0`` node, you would run:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
ssh user1@localhost -p 32893
|
||||
Password authentication
|
||||
Password:
|
||||
|
||||
|
||||
Welcome to the Corda interactive shell.
|
||||
Useful commands include 'help' to see what is available, and 'bye' to shut down the node.
|
||||
|
||||
>>> run networkMapSnapshot
|
||||
[
|
||||
{ "addresses" : [ "partya0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyA, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701330613 },
|
||||
{ "addresses" : [ "notary0:10020" ], "legalIdentitiesAndCerts" : [ "O=Notary, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701305115 },
|
||||
{ "addresses" : [ "partyc0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyC, L=Paris, C=FR" ], "platformVersion" : 3, "serial" : 1532701331608 },
|
||||
{ "addresses" : [ "partyb0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyB, L=New York, C=US" ], "platformVersion" : 3, "serial" : 1532701330118 }
|
||||
]
|
||||
|
||||
>>>
|
||||
|
||||
Adding additional nodes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
It is possible to add additional nodes to the network by reusing the nodes you built earlier. For example, to add a
|
||||
node by reusing the existing ``PartyA`` node, you would run:
|
||||
|
||||
``java -jar <network-builder-jar> --add "PartyA=O=PartyZ,L=London,C=GB"``
|
||||
|
||||
To confirm the node has been started correctly, run the following in the previously connected SSH session:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
Tue Jul 17 15:47:14 GMT 2018>>> run networkMapSnapshot
|
||||
[
|
||||
{ "addresses" : [ "partya0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyA, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701330613 },
|
||||
{ "addresses" : [ "notary0:10020" ], "legalIdentitiesAndCerts" : [ "O=Notary, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701305115 },
|
||||
{ "addresses" : [ "partyc0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyC, L=Paris, C=FR" ], "platformVersion" : 3, "serial" : 1532701331608 },
|
||||
{ "addresses" : [ "partyb0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyB, L=New York, C=US" ], "platformVersion" : 3, "serial" : 1532701330118 },
|
||||
{ "addresses" : [ "partya1:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyZ, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701630861 }
|
||||
]
|
||||
|
||||
Building a network in Graphical User Mode
|
||||
-----------------------------------------
|
||||
|
||||
The Corda Network Builder also provides a GUI for when automated interactions are not required. To launch it, run
|
||||
``java -jar <path/to/network-builder-jar> -g``.
|
||||
|
||||
Starting the nodes
|
||||
^^^^^^^^^^^^^^^^^^
|
||||
|
||||
1. Click ``Open nodes ...`` and select the folder where you built your nodes in :ref:`creating_the_base_nodes` and
|
||||
click ``Open``
|
||||
2. Select ``Local Docker`` or ``Azure``
|
||||
3. Click ``Build``
|
||||
|
||||
.. note:: The Azure configuration is handled by the az-cli utility. See the :ref:`pre-requisites`.
|
||||
|
||||
All the nodes should eventually move to a ``Status`` of ``INSTANTIATED``. If you run ``docker ps`` from the terminal to
|
||||
see the running containers, the following output should be displayed:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
|
||||
406868b4ba69 node-partyc:corda-network "/run-corda.sh" 17 seconds ago Up 16 seconds 0.0.0.0:32902->10003/tcp, 0.0.0.0:32895->10005/tcp, 0.0.0.0:32898->10020/tcp, 0.0.0.0:32900->12222/tcp partyc0
|
||||
4546a2fa8de7 node-partyb:corda-network "/run-corda.sh" 17 seconds ago Up 17 seconds 0.0.0.0:32896->10003/tcp, 0.0.0.0:32899->10005/tcp, 0.0.0.0:32901->10020/tcp, 0.0.0.0:32903->12222/tcp partyb0
|
||||
c8c44c515bdb node-partya:corda-network "/run-corda.sh" 17 seconds ago Up 17 seconds 0.0.0.0:32894->10003/tcp, 0.0.0.0:32897->10005/tcp, 0.0.0.0:32892->10020/tcp, 0.0.0.0:32893->12222/tcp partya0
|
||||
cf7ab689f493 node-notary:corda-network "/run-corda.sh" 30 seconds ago Up 31 seconds 0.0.0.0:32888->10003/tcp, 0.0.0.0:32889->10005/tcp, 0.0.0.0:32890->10020/tcp, 0.0.0.0:32891->12222/tcp notary0
|
||||
|
||||
Interacting with the nodes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
See :ref:`interacting_with_the_nodes`.
|
||||
|
||||
Adding additional nodes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
It is possible to add additional nodes to the network by reusing the nodes you built earlier. For example, to add a
|
||||
node by reusing the existing ``PartyA`` node, you would:
|
||||
|
||||
1. Select ``partya`` in the dropdown
|
||||
2. Click ``Add Instance``
|
||||
3. Specify the new node's X500 name and click ``OK``
|
||||
|
||||
If you click on ``partya`` in the pane, you should see an additional instance listed in the sidebar. To confirm the
|
||||
node has been started correctly, run the following in the previously connected SSH session:
|
||||
|
||||
.. sourcecode:: shell
|
||||
|
||||
Tue Jul 17 15:47:14 GMT 2018>>> run networkMapSnapshot
|
||||
[
|
||||
{ "addresses" : [ "partya0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyA, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701330613 },
|
||||
{ "addresses" : [ "notary0:10020" ], "legalIdentitiesAndCerts" : [ "O=Notary, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701305115 },
|
||||
{ "addresses" : [ "partyc0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyC, L=Paris, C=FR" ], "platformVersion" : 3, "serial" : 1532701331608 },
|
||||
{ "addresses" : [ "partyb0:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyB, L=New York, C=US" ], "platformVersion" : 3, "serial" : 1532701330118 },
|
||||
{ "addresses" : [ "partya1:10020" ], "legalIdentitiesAndCerts" : [ "O=PartyZ, L=London, C=GB" ], "platformVersion" : 3, "serial" : 1532701630861 }
|
||||
]
|
||||
|
||||
Shutting down the nodes
|
||||
-----------------------
|
||||
|
||||
Run ``docker kill $(docker ps -q)`` to kill all running Docker processes.
|
@ -7,6 +7,7 @@ wish to try the :doc:`blob-inspector`.
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
network-builder
|
||||
network-bootstrapper
|
||||
notary-healthcheck
|
||||
demobench
|
||||
|
@ -182,6 +182,9 @@ dependencies {
|
||||
// Jsh: A SSH implementation for tunneling inbound traffic via a relay
|
||||
compile group: 'com.jcraft', name: 'jsch', version: '0.1.54'
|
||||
|
||||
//Picocli for command line interface
|
||||
compile "info.picocli:picocli:$picocli_version"
|
||||
|
||||
// Integration test helpers
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
integrationTestCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
|
@ -118,15 +118,12 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
||||
* sweeping up the Node into the Kryo checkpoint serialization via any flows holding a reference to ServiceHub.
|
||||
*/
|
||||
// TODO Log warning if this node is a notary but not one of the ones specified in the network parameters, both for core and custom
|
||||
|
||||
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
|
||||
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
|
||||
abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val platformClock: CordaClock,
|
||||
protected val versionInfo: VersionInfo,
|
||||
protected val cordappLoader: CordappLoader,
|
||||
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||
val platformClock: CordaClock,
|
||||
protected val versionInfo: VersionInfo,
|
||||
protected val cordappLoader: CordappLoader,
|
||||
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||
|
||||
protected abstract val log: Logger
|
||||
|
||||
@ -190,6 +187,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize()
|
||||
val contractUpgradeService = ContractUpgradeServiceImpl().tokenize()
|
||||
val auditService = DummyAuditService().tokenize()
|
||||
@Suppress("LeakingThis")
|
||||
protected val network: MessagingService = makeMessagingService().tokenize()
|
||||
val services = ServiceHubInternalImpl().tokenize()
|
||||
@Suppress("LeakingThis")
|
||||
val smm = makeStateMachineManager()
|
||||
@ -204,8 +203,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
configuration.drainingModePollPeriod,
|
||||
unfinishedSchedules = busyNodeLatch
|
||||
).tokenize().closeOnStop()
|
||||
// TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance
|
||||
protected lateinit var network: MessagingService
|
||||
|
||||
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
||||
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
|
||||
@ -301,10 +298,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
log.info("Node starting up ...")
|
||||
|
||||
// TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not
|
||||
// possible (yet) to due restriction from MockNode
|
||||
network = makeMessagingService().tokenize()
|
||||
|
||||
val trustRoot = initKeyStore()
|
||||
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
initialiseJVMAgents()
|
||||
|
@ -14,7 +14,6 @@ import com.codahale.metrics.JmxReporter
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.Emoji
|
||||
@ -23,7 +22,6 @@ import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.errors.AddressBindingException
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.NetworkParameters
|
||||
@ -49,6 +47,7 @@ import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.StartedNodeServices
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
@ -68,6 +67,7 @@ import net.corda.serialization.internal.*
|
||||
import org.h2.jdbc.JdbcSQLException
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import rx.schedulers.Schedulers
|
||||
import java.net.BindException
|
||||
@ -76,7 +76,6 @@ import java.time.Clock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
import kotlin.system.exitProcess
|
||||
import rx.Observable
|
||||
|
||||
class NodeWithInfo(val node: Node, val info: NodeInfo) {
|
||||
val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by node.services, FlowStarter by node.flowStarter {}
|
||||
@ -209,7 +208,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
override fun startMessagingService(rpcOps: RPCOps, nodeInfo: NodeInfo, myNotaryIdentity: PartyAndCertificate?, networkParameters: NetworkParameters) {
|
||||
require(nodeInfo.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
|
||||
|
||||
val client = network as P2PMessagingClient
|
||||
network as P2PMessagingClient
|
||||
|
||||
// Construct security manager reading users data either from the 'security' config section
|
||||
// if present or from rpcUsers list if the former is missing from config.
|
||||
@ -235,7 +234,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
|
||||
val externalBridge = configuration.enterpriseConfiguration.externalBridge
|
||||
val bridgeControlListener = if (externalBridge == null || !externalBridge) {
|
||||
BridgeControlListener(configuration, client.serverAddress, networkParameters.maxMessageSize)
|
||||
BridgeControlListener(configuration, network.serverAddress, networkParameters.maxMessageSize)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
@ -270,8 +269,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
closeOnStop()
|
||||
init(rpcOps, securityManager)
|
||||
}
|
||||
client.closeOnStop()
|
||||
client.start(
|
||||
network.closeOnStop()
|
||||
network.start(
|
||||
myIdentity = nodeInfo.legalIdentities[0].owningKey,
|
||||
serviceIdentity = if (nodeInfo.legalIdentities.size == 1) null else nodeInfo.legalIdentities[1].owningKey,
|
||||
advertisedAddress = nodeInfo.addresses.single(),
|
||||
|
@ -82,10 +82,7 @@ open class NodeStartup(val args: Array<String>) {
|
||||
|
||||
val registrationMode = checkRegistrationMode()
|
||||
val cmdlineOptions: CmdLineOptions = if (registrationMode && !args.contains("--initial-registration")) {
|
||||
"Node was started before with `--initial-registration`, but the registration was not completed.\nResuming registration.".let {
|
||||
println(it)
|
||||
logger.info(it)
|
||||
}
|
||||
println("Node was started before with `--initial-registration`, but the registration was not completed.\nResuming registration.")
|
||||
// Pretend that the node was started with `--initial-registration` to help prevent user error.
|
||||
NodeArgsParser().parseOrExit(*args.plus("--initial-registration"))
|
||||
} else {
|
||||
|
@ -156,13 +156,6 @@ interface ReceivedMessage : Message {
|
||||
val isSessionInit: Boolean
|
||||
}
|
||||
|
||||
/** A singleton that's useful for validating topic strings */
|
||||
object TopicStringValidator {
|
||||
private val regex = "[a-zA-Z0-9.]+".toPattern()
|
||||
/** @throws IllegalArgumentException if the given topic contains invalid characters */
|
||||
fun check(tag: String) = require(regex.matcher(tag).matches())
|
||||
}
|
||||
|
||||
/**
|
||||
* This handler is used to implement exactly-once delivery of an external event on top of an at-least-once delivery. This is done
|
||||
* using two hooks that are called from the event processor, one called from the database transaction committing the
|
||||
|
@ -111,7 +111,7 @@ class SingleThreadedStateMachineManager(
|
||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||
private val transitionExecutor = makeTransitionExecutor()
|
||||
private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID // This is a getter since AbstractNode.network is still lateinit
|
||||
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
||||
|
||||
private var checkpointSerializationContext: SerializationContext? = null
|
||||
private var actionExecutor: ActionExecutor? = null
|
||||
|
119
node/src/main/kotlin/net/corda/node/utilities/CLIUtils.kt
Normal file
119
node/src/main/kotlin/net/corda/node/utilities/CLIUtils.kt
Normal file
@ -0,0 +1,119 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.jcabi.manifests.Manifests
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.internal.isReadable
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.logging.log4j.Level
|
||||
import picocli.CommandLine
|
||||
import picocli.CommandLine.*
|
||||
import java.nio.file.Path
|
||||
import kotlin.system.exitProcess
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Something heavily used in network services, I am not sure it's of much use in corda, but who knows. Definitely it was the key to making DevOps happy.
|
||||
* Add it as
|
||||
* `@CommandLine.Mixin
|
||||
* lateinit var configParser: ConfigFilePathArgsParser`
|
||||
*
|
||||
* in your command class and then validate()
|
||||
*/
|
||||
@Command(description = ["Parse configuration file. Checks if given configuration file exists"])
|
||||
class ConfigFilePathArgsParser : Validated {
|
||||
@Option(names = ["--config-file", "-f"], required = true, paramLabel = "FILE", description = ["The path to the config file"])
|
||||
lateinit var configFile: Path
|
||||
|
||||
override fun validator(): List<String> {
|
||||
val res = mutableListOf<String>()
|
||||
if(!configFile.exists()) res += "Config file ${configFile.toAbsolutePath().normalize()} does not exist!"
|
||||
if(!configFile.isReadable) res += "Config file ${configFile.toAbsolutePath().normalize()} is not readable"
|
||||
return res
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple version printing when command is called with --version or -V flag. Assuming that we reuse Corda-Release-Version and Corda-Revision
|
||||
* in the manifest file.
|
||||
*/
|
||||
class CordaVersionProvider : IVersionProvider {
|
||||
override fun getVersion(): Array<String> {
|
||||
return if (Manifests.exists("Corda-Release-Version") && Manifests.exists("Corda-Revision")) {
|
||||
arrayOf("Version: ${Manifests.read("Corda-Release-Version")}", "Revision: ${Manifests.read("Corda-Revision")}")
|
||||
} else {
|
||||
arrayOf("No version data is available in the MANIFEST file.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Usually when we have errors in some command line flags that are not handled by picocli (e.g. non existing file). Error is thrown
|
||||
* and no CommandLine help afterwards. This can be called from run() method.
|
||||
*/
|
||||
interface Validated {
|
||||
companion object {
|
||||
val logger = contextLogger()
|
||||
const val RED = "\u001B[31m"
|
||||
const val RESET = "\u001B[0m"
|
||||
}
|
||||
/**
|
||||
* Check that provided command line parameters are valid, e.g. check file existence. Return list of error strings.
|
||||
*/
|
||||
fun validator(): List<String>
|
||||
|
||||
/**
|
||||
* Function that provides nice error handing of command line validation.
|
||||
*/
|
||||
fun validate() {
|
||||
val errors = validator()
|
||||
if (errors.isNotEmpty()) {
|
||||
logger.error(RED + "Exceptions when parsing command line arguments:")
|
||||
logger.error(errors.joinToString("\n") + RESET)
|
||||
CommandLine(this).usage(System.err)
|
||||
exitProcess(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple base class for handling help, version, verbose and logging-level commands.
|
||||
* As versionProvider information from the MANIFEST file is used. It can be overwritten by custom version providers (see: Node)
|
||||
* Picocli will prioritise versionProvider from the `@Command` annotation on the subclass, see: https://picocli.info/#_reuse_combinations
|
||||
*/
|
||||
@Command(mixinStandardHelpOptions = true,
|
||||
versionProvider = CordaVersionProvider::class,
|
||||
sortOptions = false,
|
||||
synopsisHeading = "%n@|bold,underline Usage|@:%n%n",
|
||||
descriptionHeading = "%n@|bold,underline Description|@:%n%n",
|
||||
parameterListHeading = "%n@|bold,underline Parameters|@:%n%n",
|
||||
optionListHeading = "%n@|bold,underline Options|@:%n%n",
|
||||
commandListHeading = "%n@|bold,underline Commands|@:%n%n")
|
||||
abstract class ArgsParser {
|
||||
@Option(names = [ "-v", "--verbose" ], description = ["If set, prints logging to the console as well as to a file."])
|
||||
var verbose: Boolean = false
|
||||
|
||||
@Option(names = ["--logging-level"],
|
||||
// TODO For some reason I couldn't make picocli COMPLETION-CANDIDATES work
|
||||
description = ["Enable logging at this level and higher. Defaults to INFO. Possible values: OFF, INFO, WARN, TRACE, DEBUG, ERROR, FATAL, ALL"],
|
||||
converter = [LoggingLevelConverter::class])
|
||||
var loggingLevel: Level = Level.INFO
|
||||
|
||||
// This needs to be called before loggers (See: NodeStartup.kt:51 logger called by lazy, initLogging happens before).
|
||||
// Node's logging is more rich. In corda configurations two properties, defaultLoggingLevel and consoleLogLevel, are usually used.
|
||||
protected open fun initLogging() {
|
||||
val loggingLevel = loggingLevel.name().toLowerCase(Locale.ENGLISH)
|
||||
System.setProperty("defaultLogLevel", loggingLevel) // These properties are referenced from the XML config file.
|
||||
if (verbose) {
|
||||
System.setProperty("consoleLogLevel", loggingLevel)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converter from String to log4j logging Level.
|
||||
*/
|
||||
class LoggingLevelConverter : ITypeConverter<Level> {
|
||||
override fun convert(value: String?): Level {
|
||||
return value?.let { Level.getLevel(it) } ?: throw TypeConversionException("Unknown option for --logging-level: $value")
|
||||
}
|
||||
}
|
@ -1,125 +0,0 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.AllPossibleRecipients
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.TopicStringValidator
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFails
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class InMemoryMessagingTests {
|
||||
lateinit var mockNet: InternalMockNetwork
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
mockNet = InternalMockNetwork()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `topic string validation`() {
|
||||
TopicStringValidator.check("this.is.ok")
|
||||
TopicStringValidator.check("this.is.OkAlso")
|
||||
assertFails {
|
||||
TopicStringValidator.check("this.is.not-ok")
|
||||
}
|
||||
assertFails {
|
||||
TopicStringValidator.check("")
|
||||
}
|
||||
assertFails {
|
||||
TopicStringValidator.check("this.is not ok") // Spaces
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun basics() {
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
val node3 = mockNet.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
var finalDelivery: Message? = null
|
||||
node2.network.addMessageHandler("test.topic") { msg, _, _ ->
|
||||
node2.network.send(msg, node3.network.myAddress)
|
||||
}
|
||||
node3.network.addMessageHandler("test.topic") { msg, _, _ ->
|
||||
finalDelivery = msg
|
||||
}
|
||||
|
||||
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
|
||||
node1.network.send(node1.network.createMessage("test.topic", data = bits), node2.network.myAddress)
|
||||
|
||||
mockNet.runNetwork(rounds = 1)
|
||||
|
||||
assertTrue(Arrays.equals(finalDelivery!!.data.bytes, bits))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun broadcast() {
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
val node3 = mockNet.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
|
||||
var counter = 0
|
||||
listOf(node1, node2, node3).forEach { it.network.addMessageHandler("test.topic") { _, _, _ -> counter++ } }
|
||||
node1.network.send(node2.network.createMessage("test.topic", data = bits), rigorousMock<AllPossibleRecipients>())
|
||||
mockNet.runNetwork(rounds = 1)
|
||||
assertEquals(3, counter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that unhandled messages in the received queue are skipped and the next message processed, rather than
|
||||
* causing processing to return null as if there was no message.
|
||||
*/
|
||||
@Test
|
||||
fun `skip unhandled messages`() {
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
var received = 0
|
||||
|
||||
node1.network.addMessageHandler("valid_message") { _, _, _ ->
|
||||
received++
|
||||
}
|
||||
|
||||
val invalidMessage = node2.network.createMessage("invalid_message", data = ByteArray(1))
|
||||
val validMessage = node2.network.createMessage("valid_message", data = ByteArray(1))
|
||||
node2.network.send(invalidMessage, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(0, received)
|
||||
|
||||
node2.network.send(validMessage, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(1, received)
|
||||
|
||||
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
|
||||
val invalidMessage2 = node2.network.createMessage("invalid_message", data = ByteArray(1))
|
||||
val validMessage2 = node2.network.createMessage("valid_message", data = ByteArray(1))
|
||||
node2.network.send(invalidMessage2, node1.network.myAddress)
|
||||
node2.network.send(validMessage2, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(2, received)
|
||||
}
|
||||
}
|
@ -57,7 +57,6 @@ import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.TEST_TX_TIME
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.vault.VaultFiller
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.ledger
|
||||
@ -235,7 +234,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
aliceNode.internals.disableDBCloseOnStop()
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
|
||||
val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle
|
||||
val bobAddr = bobNode.network.myAddress
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
|
||||
val notary = mockNet.defaultNotaryIdentity
|
||||
|
@ -831,7 +831,7 @@ private inline fun <reified P : FlowLogic<*>> TestStartedNode.registerFlowFactor
|
||||
initiatingFlowClass: KClass<out FlowLogic<*>>,
|
||||
initiatedFlowVersion: Int = 1,
|
||||
noinline flowFactory: (FlowSession) -> P): CordaFuture<P> {
|
||||
val observable = internalRegisterFlowFactory(
|
||||
val observable = registerFlowFactory(
|
||||
initiatingFlowClass.java,
|
||||
InitiatedFlowFactory.CorDapp(initiatedFlowVersion, "", flowFactory),
|
||||
P::class.java,
|
||||
|
@ -15,11 +15,6 @@ import net.corda.node.services.FinalityHandler
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.MessagingServiceSpy
|
||||
import net.corda.testing.node.internal.newContext
|
||||
import net.corda.testing.node.internal.setMessagingServiceSpy
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
@ -79,7 +74,7 @@ class RetryFlowMockTest {
|
||||
fun `Retry does not set senderUUID`() {
|
||||
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
|
||||
val partyB = nodeB.info.legalIdentities.first()
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
messagesSent.add(message)
|
||||
messagingService.send(message, target)
|
||||
@ -95,7 +90,7 @@ class RetryFlowMockTest {
|
||||
fun `Restart does not set senderUUID`() {
|
||||
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
|
||||
val partyB = nodeB.info.legalIdentities.first()
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
messagesSent.add(message)
|
||||
messagingService.send(message, target)
|
||||
@ -109,7 +104,7 @@ class RetryFlowMockTest {
|
||||
assertNotNull(messagesSent.first().senderUUID)
|
||||
nodeA = mockNet.restartNode(nodeA)
|
||||
// This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going.
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
|
||||
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
messagesSent.add(message)
|
||||
messagingService.send(message, target)
|
||||
@ -117,7 +112,7 @@ class RetryFlowMockTest {
|
||||
})
|
||||
// Now short circuit the iterations so the flow finishes soon.
|
||||
KeepSendingFlow.count.set(count - 2)
|
||||
while (nodeA.smm.allStateMachines.size > 0) {
|
||||
while (nodeA.smm.allStateMachines.isNotEmpty()) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
assertNull(messagesSent.last().senderUUID)
|
||||
|
@ -14,18 +14,8 @@ import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.NotarisationPayload
|
||||
import net.corda.core.flows.NotarisationRequest
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.notary.generateSignature
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
@ -45,13 +35,6 @@ import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.TestClock
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.InMemoryMessage
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.MessagingServiceSpy
|
||||
import net.corda.testing.node.internal.setMessagingServiceSpy
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
@ -318,7 +301,7 @@ class ValidatingNotaryServiceTests {
|
||||
}
|
||||
|
||||
private fun runNotarisationAndInterceptClientPayload(payloadModifier: (NotarisationPayload) -> NotarisationPayload) {
|
||||
aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy(aliceNode.network) {
|
||||
aliceNode.setMessagingServiceSpy(object : MessagingServiceSpy() {
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
val messageData = message.data.deserialize<Any>() as? InitialSessionMessage
|
||||
val payload = messageData?.firstPayload!!.deserialize()
|
||||
@ -328,7 +311,6 @@ class ValidatingNotaryServiceTests {
|
||||
val alteredMessageData = messageData.copy(firstPayload = alteredPayload.serialize())
|
||||
val alteredMessage = InMemoryMessage(message.topic, OpaqueBytes(alteredMessageData.serialize().bytes), message.uniqueMessageId)
|
||||
messagingService.send(alteredMessage, target)
|
||||
|
||||
} else {
|
||||
messagingService.send(message, target)
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ class NodePair(private val mockNet: InternalMockNetwork) {
|
||||
private set
|
||||
|
||||
fun <T> communicate(clientLogic: AbstractClientLogic<T>, rebootClient: Boolean): FlowStateMachine<T> {
|
||||
server.internalRegisterFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false)
|
||||
server.registerFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false)
|
||||
client.services.startFlow(clientLogic)
|
||||
while (!serverRunning.get()) mockNet.runNetwork(1)
|
||||
if (rebootClient) {
|
||||
|
@ -62,8 +62,9 @@ import net.corda.testing.dsl.TestTransactionDSLInterpreter
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.TEST_TX_TIME
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.*
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.ledger
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -245,7 +246,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
aliceNode.internals.disableDBCloseOnStop()
|
||||
bobNode.internals.disableDBCloseOnStop()
|
||||
|
||||
val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle
|
||||
val bobAddr = bobNode.network.myAddress
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
|
||||
val notary = mockNet.defaultNotaryIdentity
|
||||
|
@ -11,6 +11,7 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
|
||||
|
||||
@StartableByRPC
|
||||
@ -23,7 +24,7 @@ class TestNotaryFlow : FlowLogic<String>() {
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(ISSUING, ISSUED, DESTROYING, FINALIZED)
|
||||
|
||||
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
val issueBuilder = TransactionBuilder()
|
||||
val notary = serviceHub.networkMapCache.notaryIdentities.first()
|
||||
@ -62,4 +63,4 @@ object NotaryTestCommand : CommandData
|
||||
class NotaryTestContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,46 +10,37 @@
|
||||
|
||||
package net.corda.testing.node
|
||||
|
||||
import net.corda.core.CordaInternal
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.AllPossibleRecipients
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.testing.node.internal.InMemoryMessage
|
||||
import net.corda.testing.node.internal.InternalMockMessagingService
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.testing.node.internal.MockNodeMessagingService
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.schedule
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* An in-memory network allows you to manufacture [InternalMockMessagingService]s for a set of participants. Each
|
||||
* [InternalMockMessagingService] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
* An in-memory network allows you to manufacture [MockNodeMessagingService]s for a set of participants. Each
|
||||
* [MockNodeMessagingService] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
|
||||
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
|
||||
* testing).
|
||||
@ -67,16 +58,16 @@ class InMemoryMessagingNetwork private constructor(
|
||||
private const val MESSAGES_LOG_NAME = "messages"
|
||||
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
|
||||
|
||||
internal fun create(
|
||||
sendManuallyPumped: Boolean,
|
||||
servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
|
||||
messagesInFlight: ReusableLatch = ReusableLatch()): InMemoryMessagingNetwork {
|
||||
@CordaInternal
|
||||
internal fun create(sendManuallyPumped: Boolean,
|
||||
servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
|
||||
messagesInFlight: ReusableLatch = ReusableLatch()): InMemoryMessagingNetwork {
|
||||
return InMemoryMessagingNetwork(sendManuallyPumped, servicePeerAllocationStrategy, messagesInFlight)
|
||||
}
|
||||
}
|
||||
|
||||
private var counter = 0 // -1 means stopped.
|
||||
private val handleEndpointMap = HashMap<PeerHandle, InMemoryMessaging>()
|
||||
private val handleEndpointMap = HashMap<PeerHandle, MockNodeMessagingService>()
|
||||
|
||||
/** A class which represents a message being transferred from sender to recipients, within the [InMemoryMessageNetwork]. **/
|
||||
@CordaSerializable
|
||||
@ -117,40 +108,35 @@ class InMemoryMessagingNetwork private constructor(
|
||||
/** A stream of (sender, message, recipients) triples containing messages once they have been received. */
|
||||
val receivedMessages: Observable<MessageTransfer>
|
||||
get() = _receivedMessages
|
||||
internal val endpoints: List<InternalMockMessagingService> @Synchronized get() = handleEndpointMap.values.toList()
|
||||
internal val endpoints: List<MockNodeMessagingService>
|
||||
@CordaInternal
|
||||
@Synchronized
|
||||
get() = handleEndpointMap.values.toList()
|
||||
/** Get a [List] of all the [MockMessagingService] endpoints **/
|
||||
val endpointsExternal: List<MockMessagingService> @Synchronized get() = handleEndpointMap.values.map { MockMessagingService.createMockMessagingService(it) }.toList()
|
||||
val endpointsExternal: List<MockMessagingService>
|
||||
@Synchronized
|
||||
get() = handleEndpointMap.values.map { MockMessagingService.createMockMessagingService(it) }.toList()
|
||||
|
||||
/**
|
||||
* Creates a node at the given address: useful if you want to recreate a node to simulate a restart.
|
||||
*
|
||||
* @param manuallyPumped if set to true, then you are expected to call [InMemoryMessaging.pumpReceive]
|
||||
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
|
||||
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
|
||||
* executor.
|
||||
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
|
||||
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
|
||||
*/
|
||||
internal fun createNodeWithID(
|
||||
manuallyPumped: Boolean,
|
||||
id: Int,
|
||||
executor: AffinityExecutor,
|
||||
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK")
|
||||
): InternalMockMessagingService {
|
||||
val peerHandle = PeerHandle(id, description)
|
||||
peersMapping[peerHandle.name] = peerHandle // Assume that the same name - the same entity in MockNetwork.
|
||||
@CordaInternal
|
||||
internal fun getPeer(name: CordaX500Name): PeerHandle? = peersMapping[name]
|
||||
|
||||
@CordaInternal
|
||||
internal fun initPeer(messagingService: MockNodeMessagingService): MockNodeMessagingService? {
|
||||
peersMapping[messagingService.myAddress.name] = messagingService.myAddress // Assume that the same name - the same entity in MockNetwork.
|
||||
return synchronized(this) {
|
||||
val node = InMemoryMessaging(manuallyPumped, peerHandle, executor)
|
||||
val oldNode = handleEndpointMap.put(peerHandle, node)
|
||||
if (oldNode != null) {
|
||||
node.inheritPendingRedelivery(oldNode)
|
||||
}
|
||||
node
|
||||
handleEndpointMap.put(messagingService.myAddress, messagingService)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun onNotaryIdentity(node: InternalMockMessagingService, notaryService: PartyAndCertificate?) {
|
||||
val peerHandle = (node as InMemoryMessaging).peerHandle
|
||||
@CordaInternal
|
||||
internal fun onMessageTransfer(transfer: MessageTransfer) {
|
||||
_receivedMessages.onNext(transfer)
|
||||
messagesInFlight.countDown()
|
||||
}
|
||||
|
||||
@CordaInternal
|
||||
internal fun addNotaryIdentity(node: MockNodeMessagingService, notaryService: PartyAndCertificate?) {
|
||||
val peerHandle = node.myAddress
|
||||
notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle }
|
||||
val serviceHandles = notaryService?.let { listOf(DistributedServiceHandle(it.party)) }
|
||||
?: emptyList() //TODO only notary can be distributed?
|
||||
@ -171,22 +157,31 @@ class InMemoryMessagingNetwork private constructor(
|
||||
private var latencyCalculator: LatencyCalculator? = null
|
||||
private val timer = Timer()
|
||||
|
||||
@Synchronized
|
||||
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
|
||||
messagesInFlight.countUp()
|
||||
messageSendQueue += MessageTransfer.createMessageTransfer(from.myAddress, message, recipients)
|
||||
@CordaInternal
|
||||
internal fun msgSend(from: MockNodeMessagingService, message: Message, recipients: MessageRecipients) {
|
||||
synchronized(this) {
|
||||
messagesInFlight.countUp()
|
||||
messageSendQueue += MessageTransfer.createMessageTransfer(from.myAddress, message, recipients)
|
||||
}
|
||||
if (!sendManuallyPumped) {
|
||||
pumpSend(false)
|
||||
}
|
||||
}
|
||||
|
||||
@CordaInternal
|
||||
@Synchronized
|
||||
private fun netNodeHasShutdown(peerHandle: PeerHandle) {
|
||||
internal fun netNodeHasShutdown(peerHandle: PeerHandle) {
|
||||
val endpoint = handleEndpointMap[peerHandle]
|
||||
if (!(endpoint?.hasPendingDeliveries() ?: false)) {
|
||||
if (endpoint?.hasPendingDeliveries() != true) {
|
||||
handleEndpointMap.remove(peerHandle)
|
||||
}
|
||||
}
|
||||
|
||||
@CordaInternal
|
||||
@Synchronized
|
||||
private fun getQueueForPeerHandle(recipients: PeerHandle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
|
||||
internal fun getQueueForPeerHandle(recipients: PeerHandle): LinkedBlockingQueue<MessageTransfer> {
|
||||
return messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
private fun getQueuesForServiceHandle(recipients: DistributedServiceHandle): List<LinkedBlockingQueue<MessageTransfer>> {
|
||||
@ -286,30 +281,6 @@ class InMemoryMessagingNetwork private constructor(
|
||||
return transfer
|
||||
}
|
||||
|
||||
/**
|
||||
* When a new message handler is added, this implies we have started a new node. The add handler logic uses this to
|
||||
* push back any un-acknowledged messages for this peer onto the head of the queue (rather than the tail) to maintain message
|
||||
* delivery order. We push them back because their consumption was not complete and a restarted node would
|
||||
* see them re-delivered if this was Artemis.
|
||||
*/
|
||||
@Synchronized
|
||||
private fun unPopMessages(transfers: Collection<MessageTransfer>, us: PeerHandle) {
|
||||
messageReceiveQueues.compute(us) { _, existing ->
|
||||
if (existing == null) {
|
||||
LinkedBlockingQueue<MessageTransfer>().apply {
|
||||
addAll(transfers)
|
||||
}
|
||||
} else {
|
||||
existing.apply {
|
||||
val drained = mutableListOf<MessageTransfer>()
|
||||
existing.drainTo(drained)
|
||||
existing.addAll(transfers)
|
||||
existing.addAll(drained)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun pumpSendInternal(transfer: MessageTransfer) {
|
||||
when (transfer.recipients) {
|
||||
is PeerHandle -> getQueueForPeerHandle(transfer.recipients).add(transfer)
|
||||
@ -321,35 +292,25 @@ class InMemoryMessagingNetwork private constructor(
|
||||
is AllPossibleRecipients -> {
|
||||
// This means all possible recipients _that the network knows about at the time_, not literally everyone
|
||||
// who joins into the indefinite future.
|
||||
for (handle in handleEndpointMap.keys)
|
||||
getQueueForPeerHandle(handle).add(transfer)
|
||||
synchronized(this) {
|
||||
for (handle in handleEndpointMap.keys) {
|
||||
getQueueForPeerHandle(handle).add(transfer)
|
||||
}
|
||||
}
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unknown type of recipient handle")
|
||||
}
|
||||
_sentMessages.onNext(transfer)
|
||||
}
|
||||
|
||||
private data class InMemoryReceivedMessage(override val topic: String,
|
||||
override val data: ByteSequence,
|
||||
override val platformVersion: Int,
|
||||
override val uniqueMessageId: DeduplicationId,
|
||||
override val debugTimestamp: Instant,
|
||||
override val peer: CordaX500Name,
|
||||
override val senderUUID: String? = null,
|
||||
override val senderSeqNo: Long? = null,
|
||||
/** Note this flag is never set in the in memory network. */
|
||||
override val isSessionInit: Boolean = false) : ReceivedMessage {
|
||||
|
||||
override val additionalHeaders: Map<String, String> = emptyMap()
|
||||
}
|
||||
|
||||
/**
|
||||
* A class that provides an abstraction over the nodes' messaging service that also contains the ability to
|
||||
* receive messages from the queue for testing purposes.
|
||||
*/
|
||||
class MockMessagingService private constructor(private val messagingService: InternalMockMessagingService) {
|
||||
class MockMessagingService private constructor(private val messagingService: MockNodeMessagingService) {
|
||||
companion object {
|
||||
internal fun createMockMessagingService(messagingService: InternalMockMessagingService): MockMessagingService {
|
||||
@CordaInternal
|
||||
internal fun createMockMessagingService(messagingService: MockNodeMessagingService): MockMessagingService {
|
||||
return MockMessagingService(messagingService)
|
||||
}
|
||||
}
|
||||
@ -362,197 +323,4 @@ class InMemoryMessagingNetwork private constructor(
|
||||
*/
|
||||
fun pumpReceive(block: Boolean): InMemoryMessagingNetwork.MessageTransfer? = messagingService.pumpReceive(block)
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
private inner class InMemoryMessaging(private val manuallyPumped: Boolean,
|
||||
val peerHandle: PeerHandle,
|
||||
private val executor: AffinityExecutor) : SingletonSerializeAsToken(), InternalMockMessagingService {
|
||||
private inner class Handler(val topicSession: String, val callback: MessageHandler) : MessageHandlerRegistration
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private inner class InnerState {
|
||||
val handlers: MutableList<Handler> = ArrayList()
|
||||
val pendingRedelivery = LinkedHashSet<MessageTransfer>()
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val processedMessages: MutableSet<DeduplicationId> = Collections.synchronizedSet(HashSet<DeduplicationId>())
|
||||
|
||||
override val myAddress: PeerHandle get() = peerHandle
|
||||
override val ourSenderUUID: String = UUID.randomUUID().toString()
|
||||
|
||||
private val backgroundThread = if (manuallyPumped) null else
|
||||
thread(isDaemon = true, name = "In-memory message dispatcher") {
|
||||
while (!Thread.currentThread().isInterrupted) {
|
||||
try {
|
||||
pumpReceiveInternal(true)
|
||||
} catch (e: InterruptedException) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
|
||||
return when (partyInfo) {
|
||||
is PartyInfo.SingleNode -> peersMapping[partyInfo.party.name]
|
||||
?: throw IllegalArgumentException("No StartedMockNode for party ${partyInfo.party.name}")
|
||||
is PartyInfo.DistributedNode -> DistributedServiceHandle(partyInfo.party)
|
||||
}
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, callback: MessageHandler): MessageHandlerRegistration {
|
||||
check(running)
|
||||
val (handler, transfers) = state.locked {
|
||||
val handler = Handler(topic, callback).apply { handlers.add(this) }
|
||||
val pending = ArrayList<MessageTransfer>()
|
||||
pending.addAll(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
Pair(handler, pending)
|
||||
}
|
||||
|
||||
unPopMessages(transfers, peerHandle)
|
||||
return handler
|
||||
}
|
||||
|
||||
fun inheritPendingRedelivery(other: InMemoryMessaging) {
|
||||
state.locked {
|
||||
pendingRedelivery.addAll(other.state.locked { pendingRedelivery })
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
check(running)
|
||||
state.locked { check(handlers.remove(registration as Handler)) }
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
check(running)
|
||||
msgSend(this, message, target)
|
||||
if (!sendManuallyPumped) {
|
||||
pumpSend(false)
|
||||
}
|
||||
}
|
||||
|
||||
override fun send(addressedMessages: List<MessagingService.AddressedMessage>) {
|
||||
for ((message, target, sequenceKey) in addressedMessages) {
|
||||
send(message, target, sequenceKey)
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
if (backgroundThread != null) {
|
||||
backgroundThread.interrupt()
|
||||
backgroundThread.join()
|
||||
}
|
||||
running = false
|
||||
netNodeHasShutdown(peerHandle)
|
||||
}
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topic: String, data: ByteArray, deduplicationId: SenderDeduplicationId, additionalHeaders: Map<String, String>): Message {
|
||||
return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId.deduplicationId, senderUUID = deduplicationId.senderUUID)
|
||||
}
|
||||
|
||||
/**
|
||||
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
|
||||
* is true, waits until one has been provided on a different thread via send. If block is false, the return
|
||||
* result indicates whether a message was delivered or not.
|
||||
*
|
||||
* @return the message that was processed, if any in this round.
|
||||
*/
|
||||
override fun pumpReceive(block: Boolean): MessageTransfer? {
|
||||
check(manuallyPumped)
|
||||
check(running)
|
||||
executor.flush()
|
||||
try {
|
||||
return pumpReceiveInternal(block)
|
||||
} finally {
|
||||
executor.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers
|
||||
* are placed into `pendingRedelivery` to try again later.
|
||||
*
|
||||
* @param block if this should block until a message it can process.
|
||||
*/
|
||||
private fun getNextQueue(q: LinkedBlockingQueue<MessageTransfer>, block: Boolean): Pair<MessageTransfer, List<Handler>>? {
|
||||
var deliverTo: List<Handler>? = null
|
||||
// Pop transfers off the queue until we run out (and are not blocking), or find something we can process
|
||||
while (deliverTo == null) {
|
||||
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
||||
deliverTo = state.locked {
|
||||
val matchingHandlers = handlers.filter { it.topicSession.isBlank() || transfer.message.topic == it.topicSession }
|
||||
if (matchingHandlers.isEmpty()) {
|
||||
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
|
||||
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
|
||||
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
|
||||
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
||||
// least sometimes.
|
||||
log.warn("Message to ${transfer.message.topic} could not be delivered")
|
||||
pendingRedelivery.add(transfer)
|
||||
null
|
||||
} else {
|
||||
matchingHandlers
|
||||
}
|
||||
}
|
||||
if (deliverTo != null) {
|
||||
return Pair(transfer, deliverTo)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
|
||||
val q = getQueueForPeerHandle(peerHandle)
|
||||
val (transfer, deliverTo) = getNextQueue(q, block) ?: return null
|
||||
if (transfer.message.uniqueMessageId !in processedMessages) {
|
||||
executor.execute {
|
||||
for (handler in deliverTo) {
|
||||
try {
|
||||
val receivedMessage = transfer.toReceivedMessage()
|
||||
state.locked { pendingRedelivery.add(transfer) }
|
||||
handler.callback(receivedMessage, handler, InMemoryDeduplicationHandler(receivedMessage, transfer))
|
||||
} catch (e: Exception) {
|
||||
log.error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
}
|
||||
_receivedMessages.onNext(transfer)
|
||||
messagesInFlight.countDown()
|
||||
}
|
||||
} else {
|
||||
log.info("Drop duplicate message ${transfer.message.uniqueMessageId}")
|
||||
}
|
||||
return transfer
|
||||
}
|
||||
|
||||
private fun MessageTransfer.toReceivedMessage(): ReceivedMessage = InMemoryReceivedMessage(
|
||||
message.topic,
|
||||
OpaqueBytes(message.data.bytes.copyOf()), // Kryo messes with the buffer so give each client a unique copy
|
||||
1,
|
||||
message.uniqueMessageId,
|
||||
message.debugTimestamp,
|
||||
sender.name)
|
||||
|
||||
private inner class InMemoryDeduplicationHandler(override val receivedMessage: ReceivedMessage, val transfer: MessageTransfer) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
|
||||
override val externalCause: ExternalEvent
|
||||
get() = this
|
||||
override val deduplicationHandler: DeduplicationHandler
|
||||
get() = this
|
||||
|
||||
override fun afterDatabaseTransaction() {
|
||||
this@InMemoryMessaging.state.locked { pendingRedelivery.remove(transfer) }
|
||||
}
|
||||
|
||||
override fun insideDatabaseTransaction() {
|
||||
processedMessages += transfer.message.uniqueMessageId
|
||||
}
|
||||
}
|
||||
|
||||
fun hasPendingDeliveries(): Boolean = state.locked { pendingRedelivery.isNotEmpty() }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ class StartedMockNode private constructor(private val node: TestStartedNode) {
|
||||
* @return the message that was processed, if any in this round.
|
||||
*/
|
||||
fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
return (node.network as InternalMockMessagingService).pumpReceive(block)
|
||||
return node.network.pumpReceive(block)
|
||||
}
|
||||
|
||||
/** Returns the currently live flows of type [flowClass], and their corresponding result future. */
|
||||
|
@ -16,7 +16,7 @@ import net.corda.node.services.statemachine.DeduplicationId
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* An implementation of [Message] for in memory messaging by the test [InMemoryMessagingNetwork].
|
||||
* An implementation of [Message] for in memory messaging by the test [MockNodeMessagingService].
|
||||
*/
|
||||
data class InMemoryMessage(override val topic: String,
|
||||
override val data: ByteSequence,
|
||||
@ -27,4 +27,4 @@ data class InMemoryMessage(override val topic: String,
|
||||
override val additionalHeaders: Map<String, String> = emptyMap()
|
||||
|
||||
override fun toString() = "$topic#${String(data.bytes)}"
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,10 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.hours
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.cordapp.CordappLoader
|
||||
import net.corda.node.internal.AbstractNode
|
||||
@ -46,6 +49,7 @@ import net.corda.node.services.api.StartedNodeServices
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.node.services.keys.KeyManagementServiceInternal
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
@ -80,10 +84,6 @@ import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Vendor")
|
||||
|
||||
fun TestStartedNode.pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
return (network as InternalMockMessagingService).pumpReceive(block)
|
||||
}
|
||||
|
||||
data class MockNodeArgs(
|
||||
val config: NodeConfiguration,
|
||||
val network: InternalMockNetwork,
|
||||
@ -119,12 +119,23 @@ interface TestStartedNode {
|
||||
val smm: StateMachineManager
|
||||
val attachments: NodeAttachmentService
|
||||
val rpcOps: CordaRPCOps
|
||||
val network: MessagingService
|
||||
val network: MockNodeMessagingService
|
||||
val database: CordaPersistence
|
||||
val notaryService: NotaryService?
|
||||
|
||||
fun dispose() = internals.stop()
|
||||
|
||||
fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
return network.pumpReceive(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach a [MessagingServiceSpy] to the [InternalMockNetwork.MockNode] allowing interception and modification of messages.
|
||||
*/
|
||||
fun setMessagingServiceSpy(spy: MessagingServiceSpy) {
|
||||
internals.setMessagingServiceSpy(spy)
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
|
||||
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].
|
||||
@ -132,10 +143,10 @@ interface TestStartedNode {
|
||||
*/
|
||||
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>): Observable<T>
|
||||
|
||||
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
|
||||
flowFactory: InitiatedFlowFactory<F>,
|
||||
initiatedFlowClass: Class<F>,
|
||||
track: Boolean): Observable<F>
|
||||
fun <F : FlowLogic<*>> registerFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
|
||||
flowFactory: InitiatedFlowFactory<F>,
|
||||
initiatedFlowClass: Class<F>,
|
||||
track: Boolean): Observable<F>
|
||||
}
|
||||
|
||||
open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParameters(),
|
||||
@ -155,10 +166,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
require(networkParameters.notaries.isEmpty()) { "Define notaries using notarySpecs" }
|
||||
}
|
||||
|
||||
private companion object {
|
||||
private val logger = loggerFor<InternalMockNetwork>()
|
||||
}
|
||||
|
||||
var nextNodeId = 0
|
||||
private set
|
||||
private val filesystem = Jimfs.newFileSystem(unix())
|
||||
@ -292,12 +299,15 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
args.network.getServerThread(args.id),
|
||||
args.network.busyLatch
|
||||
) {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
/** The actual [StartedNode] implementation created by this node */
|
||||
/** The actual [TestStartedNode] implementation created by this node */
|
||||
private class TestStartedNodeImpl(
|
||||
override val internals: MockNode,
|
||||
override val attachments: NodeAttachmentService,
|
||||
override val network: MessagingService,
|
||||
override val network: MockNodeMessagingService,
|
||||
override val services: StartedNodeServices,
|
||||
override val info: NodeInfo,
|
||||
override val smm: StateMachineManager,
|
||||
@ -305,7 +315,7 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
override val rpcOps: CordaRPCOps,
|
||||
override val notaryService: NotaryService?) : TestStartedNode {
|
||||
|
||||
override fun <F : FlowLogic<*>> internalRegisterFlowFactory(
|
||||
override fun <F : FlowLogic<*>> registerFlowFactory(
|
||||
initiatingFlowClass: Class<out FlowLogic<*>>,
|
||||
flowFactory: InitiatedFlowFactory<F>,
|
||||
initiatedFlowClass: Class<F>,
|
||||
@ -318,25 +328,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
internals.registerInitiatedFlow(smm, initiatedFlowClass)
|
||||
}
|
||||
|
||||
override fun createStartedNode(nodeInfo: NodeInfo, rpcOps: CordaRPCOps, notaryService: NotaryService?): TestStartedNode =
|
||||
TestStartedNodeImpl(
|
||||
this,
|
||||
attachments,
|
||||
network,
|
||||
object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter { },
|
||||
nodeInfo,
|
||||
smm,
|
||||
database,
|
||||
rpcOps,
|
||||
notaryService
|
||||
)
|
||||
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
}
|
||||
|
||||
val mockNet = args.network
|
||||
val id = args.id
|
||||
init {
|
||||
require(id >= 0) { "Node ID must be zero or positive, was passed: $id" }
|
||||
}
|
||||
private val entropyRoot = args.entropyRoot
|
||||
var counter = entropyRoot
|
||||
override val log get() = staticLog
|
||||
@ -353,9 +349,23 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
|
||||
override val started: TestStartedNode? get() = uncheckedCast(super.started)
|
||||
|
||||
override fun createStartedNode(nodeInfo: NodeInfo, rpcOps: CordaRPCOps, notaryService: NotaryService?): TestStartedNode {
|
||||
return TestStartedNodeImpl(
|
||||
this,
|
||||
attachments,
|
||||
network as MockNodeMessagingService,
|
||||
object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter { },
|
||||
nodeInfo,
|
||||
smm,
|
||||
database,
|
||||
rpcOps,
|
||||
notaryService
|
||||
)
|
||||
}
|
||||
|
||||
override fun start(): TestStartedNode {
|
||||
mockNet.networkParametersCopier.install(configuration.baseDirectory)
|
||||
return super.start().also { advertiseNodeToNetwork(it) }
|
||||
return super.start().also(::advertiseNodeToNetwork)
|
||||
}
|
||||
|
||||
private fun advertiseNodeToNetwork(newNode: TestStartedNode) {
|
||||
@ -367,28 +377,20 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
}
|
||||
|
||||
override fun makeMessagingService(): MessagingService {
|
||||
require(id >= 0) { "Node ID must be zero or positive, was passed: $id" }
|
||||
// TODO AbstractNode is forced to call this method in start(), and not in the c'tor, because the mockNet
|
||||
// c'tor parameter isn't available. We need to be able to return a InternalMockMessagingService
|
||||
// here that can be populated properly in startMessagingService.
|
||||
return mockNet.messagingNetwork.createNodeWithID(
|
||||
!mockNet.threadPerNode,
|
||||
id,
|
||||
serverThread,
|
||||
configuration.myLegalName
|
||||
).closeOnStop()
|
||||
override fun makeMessagingService(): MockNodeMessagingService {
|
||||
return MockNodeMessagingService(configuration, serverThread).closeOnStop()
|
||||
}
|
||||
|
||||
override fun startMessagingService(rpcOps: RPCOps,
|
||||
nodeInfo: NodeInfo,
|
||||
myNotaryIdentity: PartyAndCertificate?,
|
||||
networkParameters: NetworkParameters) {
|
||||
mockNet.messagingNetwork.onNotaryIdentity(network as InternalMockMessagingService, myNotaryIdentity)
|
||||
(network as MockNodeMessagingService).start(mockNet.messagingNetwork, !mockNet.threadPerNode, id, myNotaryIdentity)
|
||||
}
|
||||
|
||||
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
|
||||
network = messagingServiceSpy
|
||||
fun setMessagingServiceSpy(spy: MessagingServiceSpy) {
|
||||
spy._messagingService = network
|
||||
(network as MockNodeMessagingService).spy = spy
|
||||
}
|
||||
|
||||
override fun makeKeyManagementService(identityService: IdentityService): KeyManagementServiceInternal {
|
||||
@ -562,13 +564,15 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
}
|
||||
|
||||
open class MessagingServiceSpy(val messagingService: MessagingService) : MessagingService by messagingService
|
||||
abstract class MessagingServiceSpy {
|
||||
internal var _messagingService: MessagingService? = null
|
||||
set(value) {
|
||||
check(field == null) { "Spy has already been attached to a node" }
|
||||
field = value
|
||||
}
|
||||
val messagingService: MessagingService get() = checkNotNull(_messagingService) { "Spy has not been attached to a node" }
|
||||
|
||||
/**
|
||||
* Attach a [MessagingServiceSpy] to the [InternalMockNetwork.MockNode] allowing interception and modification of messages.
|
||||
*/
|
||||
fun TestStartedNode.setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
|
||||
internals.setMessagingServiceSpy(messagingServiceSpy)
|
||||
abstract fun send(message: Message, target: MessageRecipients, sequenceKey: Any)
|
||||
}
|
||||
|
||||
private fun mockNodeConfiguration(): NodeConfiguration {
|
||||
@ -595,4 +599,4 @@ private fun mockNodeConfiguration(): NodeConfiguration {
|
||||
useMultiThreadedSMM = false
|
||||
)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,283 @@
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
@ThreadSafe
|
||||
class MockNodeMessagingService(private val configuration: NodeConfiguration,
|
||||
private val executor: AffinityExecutor) : SingletonSerializeAsToken(), MessagingService {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private inner class Handler(val topicSession: String, val callback: MessageHandler) : MessageHandlerRegistration
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private inner class InnerState {
|
||||
val handlers: MutableList<Handler> = ArrayList()
|
||||
val pendingRedelivery = LinkedHashSet<InMemoryMessagingNetwork.MessageTransfer>()
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val processedMessages: MutableSet<DeduplicationId> = Collections.synchronizedSet(HashSet<DeduplicationId>())
|
||||
|
||||
override val ourSenderUUID: String = UUID.randomUUID().toString()
|
||||
|
||||
private var _myAddress: InMemoryMessagingNetwork.PeerHandle? = null
|
||||
override val myAddress: InMemoryMessagingNetwork.PeerHandle get() = checkNotNull(_myAddress) { "Not started" }
|
||||
|
||||
private lateinit var network: InMemoryMessagingNetwork
|
||||
private var backgroundThread: Thread? = null
|
||||
|
||||
var spy: MessagingServiceSpy? = null
|
||||
|
||||
/**
|
||||
* @param manuallyPumped if set to true, then you are expected to call [MockNodeMessagingService.pumpReceive]
|
||||
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
|
||||
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
|
||||
* executor.
|
||||
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
|
||||
*/
|
||||
fun start(network: InMemoryMessagingNetwork, manuallyPumped: Boolean, id: Int, notaryService: PartyAndCertificate?) {
|
||||
val peerHandle = InMemoryMessagingNetwork.PeerHandle(id, configuration.myLegalName)
|
||||
|
||||
this.network = network
|
||||
_myAddress = peerHandle
|
||||
|
||||
val oldNode = network.initPeer(this)
|
||||
if (oldNode != null) {
|
||||
inheritPendingRedelivery(oldNode)
|
||||
}
|
||||
|
||||
if (!manuallyPumped) {
|
||||
backgroundThread = thread(isDaemon = true, name = "In-memory message dispatcher") {
|
||||
while (!Thread.currentThread().isInterrupted) {
|
||||
try {
|
||||
pumpReceiveInternal(true)
|
||||
} catch (e: InterruptedException) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
network.addNotaryIdentity(this, notaryService)
|
||||
}
|
||||
|
||||
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
|
||||
return when (partyInfo) {
|
||||
is PartyInfo.SingleNode -> network.getPeer(partyInfo.party.name)
|
||||
?: throw IllegalArgumentException("No StartedMockNode for party ${partyInfo.party.name}")
|
||||
is PartyInfo.DistributedNode -> InMemoryMessagingNetwork.DistributedServiceHandle(partyInfo.party)
|
||||
}
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, callback: MessageHandler): MessageHandlerRegistration {
|
||||
check(running)
|
||||
val (handler, transfers) = state.locked {
|
||||
val handler = Handler(topic, callback).apply { handlers.add(this) }
|
||||
val pending = ArrayList<InMemoryMessagingNetwork.MessageTransfer>()
|
||||
pending.addAll(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
Pair(handler, pending)
|
||||
}
|
||||
|
||||
unPopMessages(transfers)
|
||||
return handler
|
||||
}
|
||||
|
||||
/**
|
||||
* When a new message handler is added, this implies we have started a new node. The add handler logic uses this to
|
||||
* push back any un-acknowledged messages for this peer onto the head of the queue (rather than the tail) to maintain message
|
||||
* delivery order. We push them back because their consumption was not complete and a restarted node would
|
||||
* see them re-delivered if this was Artemis.
|
||||
*/
|
||||
private fun unPopMessages(transfers: Collection<InMemoryMessagingNetwork.MessageTransfer>) {
|
||||
val messageQueue = network.getQueueForPeerHandle(myAddress)
|
||||
val drained = ArrayList<InMemoryMessagingNetwork.MessageTransfer>().apply { messageQueue.drainTo(this) }
|
||||
messageQueue.addAll(transfers)
|
||||
messageQueue.addAll(drained)
|
||||
}
|
||||
|
||||
private fun inheritPendingRedelivery(other: MockNodeMessagingService) {
|
||||
state.locked {
|
||||
pendingRedelivery.addAll(other.state.locked { pendingRedelivery })
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
check(running)
|
||||
state.locked { check(handlers.remove(registration as Handler)) }
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
|
||||
check(running)
|
||||
val spy = this.spy
|
||||
if (spy != null) {
|
||||
this.spy = null
|
||||
try {
|
||||
spy.send(message, target, sequenceKey)
|
||||
} finally {
|
||||
this.spy = spy
|
||||
}
|
||||
} else {
|
||||
network.msgSend(this, message, target)
|
||||
}
|
||||
}
|
||||
|
||||
override fun send(addressedMessages: List<MessagingService.AddressedMessage>) {
|
||||
for ((message, target, sequenceKey) in addressedMessages) {
|
||||
send(message, target, sequenceKey)
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
backgroundThread?.let {
|
||||
it.interrupt()
|
||||
it.join()
|
||||
}
|
||||
running = false
|
||||
network.netNodeHasShutdown(myAddress)
|
||||
}
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topic: String, data: ByteArray, deduplicationId: SenderDeduplicationId, additionalHeaders: Map<String, String>): Message {
|
||||
return InMemoryMessage(topic, OpaqueBytes(data), deduplicationId.deduplicationId, senderUUID = deduplicationId.senderUUID)
|
||||
}
|
||||
|
||||
/**
|
||||
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
|
||||
* is true, waits until one has been provided on a different thread via send. If block is false, the return
|
||||
* result indicates whether a message was delivered or not.
|
||||
*
|
||||
* @return the message that was processed, if any in this round.
|
||||
*/
|
||||
fun pumpReceive(block: Boolean): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
check(backgroundThread == null)
|
||||
check(running)
|
||||
executor.flush()
|
||||
try {
|
||||
return pumpReceiveInternal(block)
|
||||
} finally {
|
||||
executor.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers
|
||||
* are placed into `pendingRedelivery` to try again later.
|
||||
*
|
||||
* @param block if this should block until a message it can process.
|
||||
*/
|
||||
private fun getNextQueue(q: LinkedBlockingQueue<InMemoryMessagingNetwork.MessageTransfer>, block: Boolean): Pair<InMemoryMessagingNetwork.MessageTransfer, List<Handler>>? {
|
||||
var deliverTo: List<Handler>? = null
|
||||
// Pop transfers off the queue until we run out (and are not blocking), or find something we can process
|
||||
while (deliverTo == null) {
|
||||
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
||||
deliverTo = state.locked {
|
||||
val matchingHandlers = handlers.filter { it.topicSession.isBlank() || transfer.message.topic == it.topicSession }
|
||||
if (matchingHandlers.isEmpty()) {
|
||||
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
|
||||
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
|
||||
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
|
||||
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
||||
// least sometimes.
|
||||
log.warn("Message to ${transfer.message.topic} could not be delivered")
|
||||
pendingRedelivery.add(transfer)
|
||||
null
|
||||
} else {
|
||||
matchingHandlers
|
||||
}
|
||||
}
|
||||
if (deliverTo != null) {
|
||||
return Pair(transfer, deliverTo)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun pumpReceiveInternal(block: Boolean): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
val q = network.getQueueForPeerHandle(myAddress)
|
||||
val (transfer, deliverTo) = getNextQueue(q, block) ?: return null
|
||||
if (transfer.message.uniqueMessageId !in processedMessages) {
|
||||
executor.execute {
|
||||
for (handler in deliverTo) {
|
||||
try {
|
||||
val receivedMessage = transfer.toReceivedMessage()
|
||||
state.locked { pendingRedelivery.add(transfer) }
|
||||
handler.callback(receivedMessage, handler, InMemoryDeduplicationHandler(receivedMessage, transfer))
|
||||
} catch (e: Exception) {
|
||||
log.error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
}
|
||||
network.onMessageTransfer(transfer)
|
||||
}
|
||||
} else {
|
||||
log.info("Drop duplicate message ${transfer.message.uniqueMessageId}")
|
||||
}
|
||||
return transfer
|
||||
}
|
||||
|
||||
private fun InMemoryMessagingNetwork.MessageTransfer.toReceivedMessage(): ReceivedMessage {
|
||||
return InMemoryReceivedMessage(
|
||||
message.topic,
|
||||
OpaqueBytes(message.data.bytes.copyOf()), // Kryo messes with the buffer so give each client a unique copy
|
||||
1,
|
||||
message.uniqueMessageId,
|
||||
message.debugTimestamp,
|
||||
sender.name
|
||||
)
|
||||
}
|
||||
|
||||
private data class InMemoryReceivedMessage(override val topic: String,
|
||||
override val data: ByteSequence,
|
||||
override val platformVersion: Int,
|
||||
override val uniqueMessageId: DeduplicationId,
|
||||
override val debugTimestamp: Instant,
|
||||
override val peer: CordaX500Name,
|
||||
override val senderUUID: String? = null,
|
||||
override val senderSeqNo: Long? = null,
|
||||
/** Note this flag is never set in the in memory network. */
|
||||
override val isSessionInit: Boolean = false) : ReceivedMessage {
|
||||
|
||||
override val additionalHeaders: Map<String, String> = emptyMap()
|
||||
}
|
||||
|
||||
private inner class InMemoryDeduplicationHandler(override val receivedMessage: ReceivedMessage, val transfer: InMemoryMessagingNetwork.MessageTransfer) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
|
||||
override val externalCause: ExternalEvent
|
||||
get() = this
|
||||
override val deduplicationHandler: DeduplicationHandler
|
||||
get() = this
|
||||
|
||||
override fun afterDatabaseTransaction() {
|
||||
this@MockNodeMessagingService.state.locked { pendingRedelivery.remove(transfer) }
|
||||
}
|
||||
|
||||
override fun insideDatabaseTransaction() {
|
||||
processedMessages += transfer.message.uniqueMessageId
|
||||
}
|
||||
}
|
||||
|
||||
fun hasPendingDeliveries(): Boolean = state.locked { pendingRedelivery.isNotEmpty() }
|
||||
}
|
@ -10,11 +10,105 @@
|
||||
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import net.corda.core.messaging.AllPossibleRecipients
|
||||
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class InternalMockNetworkTests {
|
||||
lateinit var mockNet: InternalMockNetwork
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
if (this::mockNet.isInitialized) {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun basics() {
|
||||
mockNet = InternalMockNetwork()
|
||||
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
val node3 = mockNet.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
var finalDelivery: Message? = null
|
||||
node2.network.addMessageHandler("test.topic") { msg, _, _ ->
|
||||
node2.network.send(msg, node3.network.myAddress)
|
||||
}
|
||||
node3.network.addMessageHandler("test.topic") { msg, _, _ ->
|
||||
finalDelivery = msg
|
||||
}
|
||||
|
||||
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
|
||||
node1.network.send(node1.network.createMessage("test.topic", data = bits), node2.network.myAddress)
|
||||
|
||||
mockNet.runNetwork(rounds = 1)
|
||||
|
||||
assertTrue(Arrays.equals(finalDelivery!!.data.bytes, bits))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun broadcast() {
|
||||
mockNet = InternalMockNetwork()
|
||||
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
val node3 = mockNet.createNode()
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
|
||||
var counter = 0
|
||||
listOf(node1, node2, node3).forEach { it.network.addMessageHandler("test.topic") { _, _, _ -> counter++ } }
|
||||
node1.network.send(node2.network.createMessage("test.topic", data = bits), rigorousMock<AllPossibleRecipients>())
|
||||
mockNet.runNetwork(rounds = 1)
|
||||
assertEquals(3, counter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that unhandled messages in the received queue are skipped and the next message processed, rather than
|
||||
* causing processing to return null as if there was no message.
|
||||
*/
|
||||
@Test
|
||||
fun `skip unhandled messages`() {
|
||||
mockNet = InternalMockNetwork()
|
||||
|
||||
val node1 = mockNet.createNode()
|
||||
val node2 = mockNet.createNode()
|
||||
var received = 0
|
||||
|
||||
node1.network.addMessageHandler("valid_message") { _, _, _ ->
|
||||
received++
|
||||
}
|
||||
|
||||
val invalidMessage = node2.network.createMessage("invalid_message", data = ByteArray(1))
|
||||
val validMessage = node2.network.createMessage("valid_message", data = ByteArray(1))
|
||||
node2.network.send(invalidMessage, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(0, received)
|
||||
|
||||
node2.network.send(validMessage, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(1, received)
|
||||
|
||||
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
|
||||
val invalidMessage2 = node2.network.createMessage("invalid_message", data = ByteArray(1))
|
||||
val validMessage2 = node2.network.createMessage("valid_message", data = ByteArray(1))
|
||||
node2.network.send(invalidMessage2, node1.network.myAddress)
|
||||
node2.network.send(validMessage2, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(2, received)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `does not leak serialization env if init fails`() {
|
||||
val e = Exception("didn't work")
|
||||
@ -25,4 +119,4 @@ class InternalMockNetworkTests {
|
||||
}.isSameAs(e)
|
||||
assertThatThrownBy { effectiveSerializationEnv }.isInstanceOf(IllegalStateException::class.java)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,6 +122,7 @@ public class InteractiveShellJavaTest {
|
||||
|
||||
private InMemoryIdentityService ids = new InMemoryIdentityService(Lists.newArrayList(megaCorp.getIdentity()), InternalTestConstantsKt.getDEV_ROOT_CA().getCertificate());
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private ObjectMapper om = JacksonSupport.createInMemoryMapper(ids, new YAMLFactory());
|
||||
|
||||
private String output;
|
||||
|
Loading…
x
Reference in New Issue
Block a user