Merge Open Source to Enterprise (#79)

* Check array size before accessing

* Review fixes

* CORDA-540: Make Verifier work in AMQP mode (#1870)

* reference to finance module via not hardcoded group ID (#1515)

*  generic way to reference to group id when loading finance.jar via cordapp

* Fixed the node shell to work with the DataFeed class

* Attempt to make NodeStatePersistenceTests more stable (#1895)

By ensuring that the nodes are properly started and aware of each other before firing any flows through them.
Also minor refactoring.

* Disable unstable test on Windows (#1899)

* CORDA-530 Don't soft-lock non-fungible states (#1794)

* Don't run unlock query if nothing was locked
* Constructors should not have side-effects

* [CORDA-442] let Driver run without network map (#1890)

* [CORDA-442] let Driver run without network map

- Nodes started by driver run without a networkMapNode.

- Driver does not take a networkMapStartStrategy anymore

- a new parameter in the configuration "noNetworkMapServiceMode" allows for a node not to be a networkMapNode nor to connect to one.

- Driver now waits for each node to write its own NodeInfo file to disk and then copies it into each other node.

- When driver starts a node N, it waits for every node to be have N nodes in their network map.

Note: the code to copy around the NodeInfo files was already in DemoBench, the NodeInfoFilesCopier class was just moved from DemoBench into core (I'm very open to core not being the best place, please advise)

* Added missing cordappPackage dependencies. (#1894)

* Eliminate circular dependency of NodeSchedulerService on ServiceHub. (#1891)

* Update customSchemas documentation. (#1902)

* [CORDA-694] Commands visibility for Oracles (without sacrificing privacy) (#1835)

new checkCommandVisibility feature for Oracles

* CORDA-599 PersistentNetworkMapCache no longer circularly depends on SH (#1652)

* CORDA-725 - Change AMQP identifier to officially assigned value

This does change our header format so pre-cached test files need
regenerating

* CORDA-725 - update changelog

* CORDA-680 Update cordapp packages documentation (#1901)

* Introduce MockNetworkParameters

* Cordformation in Kotlin (#1873)

Cordformation rewritten in kotlin.

* Kotlin migration

* Review Comments

* CORDA-704: Implement `@DoNotImplement` annotation (#1903)

* Enhance the API Scanner plugin to monitor class annotations.
* Implement @DoNotImplement annotation, and apply it.
* Update API definition.
* Update API change detection to handle @DoNotImplement.
* Document the `@DoNotImplement` annotation.

* Experimental support for PostgreSQL (#1525)

* Cash selection refactoring such that 3d party DB providers are only required to implement Coin Selection SQL logic.

* Re-added debug logging statement.

* Updated to include PR review feedback from VK

* Refactoring following rebase from master.

* Fix broken JUnits following rebase.

* Use JDBC ResultSet getBlob() and added custom serializer to address concern raised by tomtau in PR.

* Fix failing JUnits.

* Experimental support for PostgreSQL: CashSelection done using window functions

* Moved postgresql version information into corda/build.gradle

* Using a PreparedStatement in CashSelectionPostgreSQLImpl

* Changed the PostgreSQL Cash Selection implementation to use the new refactored AbstractCashSelection

* * Retire MockServiceHubInternal (#1909)

* Introduce rigorousMock
* Add test-utils and node-driver to generated documentation

* Fix-up: Bank Of Corda sample (#1912)

In the previous version when running with `--role ISSUER` the application failed to start.
The reason was that in spite of `quantity` and `currency` were optional,
un-necessary `requestParams` been constructed regardless.

* move SMM

* Interface changes for multi-threading

* CORDA-351: added dependency check plugin to gradle build script (#1911)

* CORDA-351: added dependency check plugin to gradle build script

* CORDA-351: Added suppression stub file with example

* CORDA-351: added suppresionFile property

* CORDA-435 - Ensure Kryo only tests use Kryo serializatin context

Also correct lambda typos (from lamba)

* Network map service REST API wrapper (#1907)

* Network map client - WIP

* Java doc and doc for doc site

* remove javax.ws dependency

* NetworkParameter -> NetworkParameters

* move network map client to node

* Fix jetty test dependencies

* NetworkParameter -> NetworkParameters

* Address PR issues

* Address PR issues and unit test fix

* Address PR issues

* Fixing Bank-Of-Corda Demo in `master` (#1922)

* Fix-up: Bank Of Corda sample

Use correct CorDapp packages to scan

(cherry picked from commit 2caa134)

* Set adequate permissions for the nodes such that NodeExplorer can connect

(cherry picked from commit ae88242)

* Set adequate permissions for the nodes such that NodeExplorer can connect

(cherry picked from commit ae88242)

* Correct run configuration

* Fix-up port numbers

* CORDA-435 - AMQP serialisation cannot work with private vals

They won't be reported as properties by the introspector and thus we
will fail to find a constructor for them. This makes sense as we will be
unable to serialise an object whose members we cannot read

* CORDA-435 - AMQP enablement fixes

AMQP has different serialization rules than Kryo surrounding the way we
introspect objects to work out how to construct them

* [CORDA-442] make MockNetwork not start a networkmap node (#1908)

* [CORDA-442] make MockNetwork not start a networkmap node

Now MockNetwork will put the appropriate NodeInfos inside each running node networkMapCache.

Tests relating to networkmap node starting and interaction have been removed since they where relaying on MockNetwork

* Minor fix for api checker script to support macOS

* Retrofit changes from Enterprise PR #61 (#1934)

* Introduce MockNodeParameters/Args (#1923)

* CORDA-736 Add some new features to corda.jar via node.conf for testing (#1926)

* CORDA-699 Add injection or modification of memory network messages (#1920)

* Updated API stability changeset to reflect new schema attribute name.
This commit is contained in:
josecoll
2017-10-25 13:54:34 +01:00
committed by GitHub
parent 01d8ad41b4
commit ef7ccd3147
200 changed files with 4549 additions and 3192 deletions

View File

@ -1,90 +0,0 @@
package net.corda.node.testing
import com.codahale.metrics.MetricRegistry
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.StateLoader
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.StateLoaderImpl
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.CordaPersistence
import net.corda.testing.DUMMY_IDENTITY_1
import net.corda.testing.MOCK_HOST_AND_PORT
import net.corda.testing.MOCK_IDENTITY_SERVICE
import net.corda.testing.node.MockAttachmentStorage
import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStateMachineRecordedTransactionMappingStorage
import net.corda.testing.node.MockTransactionStorage
import java.nio.file.Paths
import java.sql.Connection
import java.time.Clock
open class MockServiceHubInternal(
override val database: CordaPersistence,
override val configuration: NodeConfiguration,
val customVault: VaultServiceInternal? = null,
val keyManagement: KeyManagementService? = null,
val network: MessagingService? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
override val attachments: AttachmentStorage = MockAttachmentStorage(),
override val validatedTransactions: WritableTransactionStorage = MockTransactionStorage(),
override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage(),
val mapCache: NetworkMapCacheInternal? = null,
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val customContractUpgradeService: ContractUpgradeService? = null,
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2),
override val cordappProvider: CordappProviderInternal = CordappProviderImpl(CordappLoader.createDefault(Paths.get(".")), attachments),
protected val stateLoader: StateLoaderImpl = StateLoaderImpl(validatedTransactions)
) : ServiceHubInternal, StateLoader by stateLoader {
override val transactionVerifierService: TransactionVerifierService
get() = customTransactionVerifierService ?: throw UnsupportedOperationException()
override val vaultService: VaultServiceInternal
get() = customVault ?: throw UnsupportedOperationException()
override val contractUpgradeService: ContractUpgradeService
get() = customContractUpgradeService ?: throw UnsupportedOperationException()
override val keyManagementService: KeyManagementService
get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService
get() = identity ?: throw UnsupportedOperationException()
override val networkService: MessagingService
get() = network ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCacheInternal
get() = mapCache ?: MockNetworkMapCache(this)
override val schedulerService: SchedulerService
get() = scheduler ?: throw UnsupportedOperationException()
override val clock: Clock
get() = overrideClock ?: throw UnsupportedOperationException()
override val myInfo: NodeInfo
get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), listOf(DUMMY_IDENTITY_1), 1, serial = 1L) // Required to get a dummy platformVersion when required for tests.
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val rpcFlows: List<Class<out FlowLogic<*>>>
get() = throw UnsupportedOperationException()
override val schemaService get() = throw UnsupportedOperationException()
override val auditService: AuditService = DummyAuditService()
lateinit var smm: StateMachineManager
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T = throw UnsupportedOperationException()
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl<T> {
return smm.executor.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) }
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? = null
override fun jdbcSession(): Connection = database.createSession()
}

View File

@ -2,14 +2,16 @@
package net.corda.testing
import com.nhaarman.mockito_kotlin.spy
import com.nhaarman.mockito_kotlin.doCallRealMethod
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.User
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
@ -54,26 +56,31 @@ fun transaction(
fun testNodeConfiguration(
baseDirectory: Path,
myLegalName: CordaX500Name,
notaryConfig: NotaryConfig? = null): NodeConfiguration {
myLegalName: CordaX500Name): NodeConfiguration {
abstract class MockableNodeConfiguration : NodeConfiguration // Otherwise Mockito is defeated by val getters.
val nc = spy<MockableNodeConfiguration>()
whenever(nc.baseDirectory).thenReturn(baseDirectory)
whenever(nc.myLegalName).thenReturn(myLegalName)
whenever(nc.minimumPlatformVersion).thenReturn(1)
whenever(nc.keyStorePassword).thenReturn("cordacadevpass")
whenever(nc.trustStorePassword).thenReturn("trustpass")
whenever(nc.rpcUsers).thenReturn(emptyList())
whenever(nc.notary).thenReturn(notaryConfig)
whenever(nc.dataSourceProperties).thenReturn(makeTestDataSourceProperties(myLegalName.organisation))
whenever(nc.database).thenReturn(makeTestDatabaseProperties())
whenever(nc.emailAddress).thenReturn("")
whenever(nc.exportJMXto).thenReturn("")
whenever(nc.devMode).thenReturn(true)
whenever(nc.certificateSigningService).thenReturn(URL("http://localhost"))
whenever(nc.certificateChainCheckPolicies).thenReturn(emptyList())
whenever(nc.verifierType).thenReturn(VerifierType.InMemory)
whenever(nc.messageRedeliveryDelaySeconds).thenReturn(5)
return nc
return rigorousMock<MockableNodeConfiguration>().also {
doReturn(true).whenever(it).noNetworkMapServiceMode
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(myLegalName).whenever(it).myLegalName
doReturn(1).whenever(it).minimumPlatformVersion
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn("trustpass").whenever(it).trustStorePassword
doReturn(emptyList<User>()).whenever(it).rpcUsers
doReturn(null).whenever(it).notary
doReturn(makeTestDataSourceProperties(myLegalName.organisation)).whenever(it).dataSourceProperties
doReturn(makeTestDatabaseProperties()).whenever(it).database
doReturn("").whenever(it).emailAddress
doReturn("").whenever(it).exportJMXto
doReturn(true).whenever(it).devMode
doReturn(URL("http://localhost")).whenever(it).certificateSigningService
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(VerifierType.InMemory).whenever(it).verifierType
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(0L).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).networkMapService
doCallRealMethod().whenever(it).certificatesDirectory
doCallRealMethod().whenever(it).trustStoreFile
doCallRealMethod().whenever(it).sslKeystore
doCallRealMethod().whenever(it).nodeKeystore
}
}

View File

@ -228,7 +228,6 @@ fun <A> rpcDriver(
extraSystemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
initialiseSerialization: Boolean = true,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
dsl: RPCDriverExposedDSLInterface.() -> A
@ -240,7 +239,6 @@ fun <A> rpcDriver(
extraSystemProperties = extraSystemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan

View File

@ -8,7 +8,6 @@ import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.cordform.NodeDefinition
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
@ -20,6 +19,8 @@ import net.corda.core.internal.div
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
@ -28,6 +29,7 @@ import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.*
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.nodeapi.User
import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.config.toConfig
@ -37,6 +39,8 @@ import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.Logger
import rx.Observable
import rx.observables.ConnectableObservable
import java.io.File
import java.net.*
import java.nio.file.Path
@ -150,14 +154,6 @@ interface DriverDSLExposedInterface : CordformContext {
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
/**
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
* to set networkMapStartStrategy to Dedicated(false) in your [driver] call.
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
* value will be used.
*/
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null, maximumHeapSize: String = "200m"): CordaFuture<NodeHandle>
fun waitForAllNodesToFinish()
/**
@ -212,13 +208,15 @@ sealed class NodeHandle {
override val configuration: FullNodeConfiguration,
override val webAddress: NetworkHostAndPort,
val debugPort: Int?,
val process: Process
val process: Process,
private val onStopCallback: () -> Unit
) : NodeHandle() {
override fun stop(): CordaFuture<Unit> {
with(process) {
destroy()
waitFor()
}
onStopCallback()
return doneFuture(Unit)
}
}
@ -229,7 +227,8 @@ sealed class NodeHandle {
override val configuration: FullNodeConfiguration,
override val webAddress: NetworkHostAndPort,
val node: StartedNode<Node>,
val nodeThread: Thread
val nodeThread: Thread,
private val onStopCallback: () -> Unit
) : NodeHandle() {
override fun stop(): CordaFuture<Unit> {
node.dispose()
@ -237,6 +236,7 @@ sealed class NodeHandle {
interrupt()
join()
}
onStopCallback()
return doneFuture(Unit)
}
}
@ -273,9 +273,8 @@ sealed class PortAllocation {
}
}
/**
* Helper builder for configuring a [Node] from Java.
*/
/** Helper builder for configuring a [Node] from Java. */
@Suppress("unused")
data class NodeParameters(
val providedName: CordaX500Name? = null,
val rpcUsers: List<User> = emptyList(),
@ -319,7 +318,6 @@ data class NodeParameters(
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param extraSystemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node.
* @param networkMapStartStrategy Determines whether a network map node is started automatically.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* @param dsl The dsl itself.
@ -334,7 +332,7 @@ fun <A> driver(
extraSystemProperties: Map<String, String> = defaultParameters.extraSystemProperties,
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
networkMapStartStrategy: NetworkMapStartStrategy = defaultParameters.networkMapStartStrategy,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: DriverDSLExposedInterface.() -> A
@ -347,7 +345,6 @@ fun <A> driver(
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
networkMapStartStrategy = networkMapStartStrategy,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan
),
@ -371,9 +368,8 @@ fun <A> driver(
return driver(defaultParameters = parameters, dsl = dsl)
}
/**
* Helper builder for configuring a [driver] from Java.
*/
/** Helper builder for configuring a [driver] from Java. */
@Suppress("unused")
data class DriverParameters(
val isDebug: Boolean = false,
val driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
@ -382,7 +378,6 @@ data class DriverParameters(
val extraSystemProperties: Map<String, String> = emptyMap(),
val useTestClock: Boolean = false,
val initialiseSerialization: Boolean = true,
val networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true),
val startNodesInProcess: Boolean = false,
val extraCordappPackagesToScan: List<String> = emptyList()
) {
@ -393,7 +388,6 @@ data class DriverParameters(
fun setExtraSystemProperties(extraSystemProperties: Map<String, String>) = copy(extraSystemProperties = extraSystemProperties)
fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock)
fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization)
fun setNetworkMapStartStrategy(networkMapStartStrategy: NetworkMapStartStrategy) = copy(networkMapStartStrategy = networkMapStartStrategy)
fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess)
fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List<String>) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan)
}
@ -608,11 +602,9 @@ class DriverDSL(
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val networkMapStartStrategy: NetworkMapStartStrategy,
val startNodesInProcess: Boolean,
extraCordappPackagesToScan: List<String>
) : DriverDSLInternalInterface {
private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort()
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
@ -620,6 +612,12 @@ class DriverDSL(
private val databaseNamesByNode = mutableMapOf<CordaX500Name, String>()
val systemProperties by lazy { System.getProperties().toList().map { it.first.toString() to it.second.toString() }.toMap() + extraSystemProperties }
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
private val nodeInfoFilesCopier = NodeInfoFilesCopier()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
class State {
val processes = ArrayList<CordaFuture<Process>>()
@ -676,25 +674,6 @@ class DriverDSL(
}
}
private fun networkMapServiceConfigLookup(networkMapCandidates: List<NodeDefinition>): (CordaX500Name) -> Map<String, String>? {
return networkMapStartStrategy.run {
when (this) {
is NetworkMapStartStrategy.Dedicated -> {
serviceConfig(dedicatedNetworkMapAddress).let {
{ _: CordaX500Name -> it }
}
}
is NetworkMapStartStrategy.Nominated -> {
serviceConfig(networkMapCandidates.single {
it.name == legalName.toString()
}.config.getString("p2pAddress").let(NetworkHostAndPort.Companion::parse)).let {
{ nodeName: CordaX500Name -> if (nodeName == legalName) null else it }
}
}
}
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
@ -710,10 +689,6 @@ class DriverDSL(
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(listOf(object : NodeDefinition {
override fun getName() = name.toString()
override fun getConfig() = configOf("p2pAddress" to p2pAddress.toString())
}))
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
@ -722,10 +697,10 @@ class DriverDSL(
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"networkMapService" to networkMapServiceConfigLookup(name),
"useTestClock" to useTestClock,
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
"verifierType" to verifierType.name,
"noNetworkMapServiceMode" to true
) + customOverrides
)
return startNodeInternal(name, config, webAddress, startInSameProcess, maximumHeapSize, logLevel)
@ -741,7 +716,6 @@ class DriverDSL(
}
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> {
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
return nodes.map { node ->
portAllocation.nextHostAndPort() // rpcAddress
val webAddress = portAllocation.nextHostAndPort()
@ -752,8 +726,8 @@ class DriverDSL(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = node.config + notary + mapOf(
"networkMapService" to networkMapServiceConfigLookup(name),
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers,
"noNetworkMapServiceMode" to true
)
)
startNodeInternal(name, config, webAddress, startInSameProcess, maximumHeapSize)
@ -839,9 +813,7 @@ class DriverDSL(
override fun start() {
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
if (networkMapStartStrategy.startDedicated) {
startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes.
}
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
}
fun baseDirectory(nodeName: CordaX500Name): Path {
@ -852,29 +824,51 @@ class DriverDSL(
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
override fun startDedicatedNetworkMapService(startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
val webAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val networkMapLegalName = networkMapStartStrategy.legalName
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(networkMapLegalName),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to networkMapLegalName.toString(),
// TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all
// node port numbers to be shifted, so all demos and docs need to be updated accordingly.
"webAddress" to webAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"rpcUsers" to defaultRpcUserList,
"p2pAddress" to dedicatedNetworkMapAddress.toString(),
"useTestClock" to useTestClock)
)
return startNodeInternal(networkMapLegalName, config, webAddress, startInProcess, maximumHeapSize)
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
private fun startNodeInternal(name: CordaX500Name, config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String, logLevel: String? = null): CordaFuture<NodeHandle> {
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables.put(rpc.nodeInfo().legalIdentities.first().name, counterObservable)
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args : Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startNodeInternal(name: CordaX500Name,config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String, logLevel: String? = null): CordaFuture<NodeHandle> {
val globalDataSourceProperties = mutableMapOf<String, Any?>()
val overriddenDatasourceUrl = systemProperties["dataSourceProperties.dataSource.url"]
@ -882,9 +876,13 @@ class DriverDSL(
val connectionString = overriddenDatasourceUrl + "/" + databaseNamesByNode.computeIfAbsent(name, { UUID.randomUUID().toString() })
globalDataSourceProperties["dataSourceProperties.dataSource.url"] = connectionString
}
val enhancedConfig = config + globalDataSourceProperties
val enhancedConfig = config+ globalDataSourceProperties
val nodeConfiguration = (enhancedConfig).parseAs<FullNodeConfiguration>()
nodeInfoFilesCopier.addConfig(nodeConfiguration.baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier.removeConfig(nodeConfiguration.baseDirectory)
countObservables.remove(nodeConfiguration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, enhancedConfig, cordappPackages)
shutdownManager.registerShutdown(
@ -897,8 +895,8 @@ class DriverDSL(
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration, openFuture()).flatMap { rpc ->
rpc.waitUntilNetworkReady().map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, node, thread)
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, node, thread, onNodeExit)
}
}
}
@ -913,7 +911,7 @@ class DriverDSL(
establishRpc(nodeConfiguration, processDeathFuture).flatMap { rpc ->
// Call waitUntilNetworkReady in background in case RPC is failing over:
val forked = executorService.fork {
rpc.waitUntilNetworkReady()
allNodesConnected(rpc)
}
val networkMapFuture = forked.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
@ -922,7 +920,8 @@ class DriverDSL(
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: ${webAddress}")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, debugPort, process)
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, debugPort, process,
onNodeExit)
}
}
}
@ -977,7 +976,7 @@ class DriverDSL(
logLevel: String? = null
): CordaFuture<Process> {
val processFuture = executorService.fork {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}")
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + debugPort ?: "not enabled")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)

View File

@ -1,23 +0,0 @@
package net.corda.testing.driver
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.DUMMY_MAP
sealed class NetworkMapStartStrategy {
internal abstract val startDedicated: Boolean
internal abstract val legalName: CordaX500Name
internal fun serviceConfig(address: NetworkHostAndPort) = mapOf(
"address" to address.toString(),
"legalName" to legalName.toString()
)
class Dedicated(startAutomatically: Boolean) : NetworkMapStartStrategy() {
override val startDedicated = startAutomatically
override val legalName = DUMMY_MAP.name
}
class Nominated(override val legalName: CordaX500Name) : NetworkMapStartStrategy() {
override val startDedicated = false
}
}

View File

@ -279,7 +279,7 @@ class InMemoryMessagingNetwork(
_sentMessages.onNext(transfer)
}
private data class InMemoryMessage(override val topicSession: TopicSession,
data class InMemoryMessage(override val topicSession: TopicSession,
override val data: ByteArray,
override val uniqueMessageId: UUID,
override val debugTimestamp: Instant = Instant.now()) : Message {
@ -363,14 +363,22 @@ class InMemoryMessagingNetwork(
state.locked { check(handlers.remove(registration as Handler)) }
}
override fun send(message: Message, target: MessageRecipients, retryId: Long?) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, acknowledgementHandler: (() -> Unit)?) {
check(running)
msgSend(this, message, target)
acknowledgementHandler?.invoke()
if (!sendManuallyPumped) {
pumpSend(false)
}
}
override fun send(addressedMessages: List<MessagingService.AddressedMessage>, acknowledgementHandler: (() -> Unit)?) {
for ((message, target, retryId, sequenceKey) in addressedMessages) {
send(message, target, retryId, sequenceKey, null)
}
acknowledgementHandler?.invoke()
}
override fun stop() {
if (backgroundThread != null) {
backgroundThread.interrupt()

View File

@ -7,9 +7,9 @@ import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.NonEmptySet
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.utilities.CordaPersistence
import net.corda.testing.getTestPartyAndCertificate
import rx.Observable
import rx.subjects.PublishSubject
@ -18,7 +18,7 @@ import java.math.BigInteger
/**
* Network map cache with no backing map service.
*/
class MockNetworkMapCache(serviceHub: ServiceHubInternal) : PersistentNetworkMapCache(serviceHub) {
class MockNetworkMapCache(database: CordaPersistence, configuration: NodeConfiguration) : PersistentNetworkMapCache(database, configuration) {
private companion object {
val BANK_C = getTestPartyAndCertificate(CordaX500Name(organisation = "Bank C", locality = "London", country = "GB"), entropyToKeyPair(BigInteger.valueOf(1000)).public)
val BANK_D = getTestPartyAndCertificate(CordaX500Name(organisation = "Bank D", locality = "London", country = "GB"), entropyToKeyPair(BigInteger.valueOf(2000)).public)

View File

@ -2,13 +2,12 @@ package net.corda.testing.node
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.cert
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.createDirectory
@ -18,7 +17,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
@ -32,9 +31,8 @@ import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.*
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
@ -42,11 +40,13 @@ import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CertificateAndKeyPair
import net.corda.nodeapi.internal.ServiceInfo
import net.corda.testing.*
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.initialiseTestSerialization
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.resetTestSerialization
import net.corda.testing.testNodeConfiguration
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import java.io.Closeable
@ -54,7 +54,7 @@ import java.math.BigInteger
import java.nio.file.Path
import java.security.KeyPair
import java.security.PublicKey
import java.security.cert.X509Certificate
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@ -62,6 +62,51 @@ fun StartedNode<MockNetwork.MockNode>.pumpReceive(block: Boolean = false): InMem
return (network as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
}
/** Helper builder for configuring a [MockNetwork] from Java. */
@Suppress("unused")
data class MockNetworkParameters(
val networkSendManuallyPumped: Boolean = false,
val threadPerNode: Boolean = false,
val servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
val defaultFactory: MockNetwork.Factory<*> = MockNetwork.DefaultFactory,
val initialiseSerialization: Boolean = true,
val cordappPackages: List<String> = emptyList()) {
fun setNetworkSendManuallyPumped(networkSendManuallyPumped: Boolean) = copy(networkSendManuallyPumped = networkSendManuallyPumped)
fun setThreadPerNode(threadPerNode: Boolean) = copy(threadPerNode = threadPerNode)
fun setServicePeerAllocationStrategy(servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy) = copy(servicePeerAllocationStrategy = servicePeerAllocationStrategy)
fun setDefaultFactory(defaultFactory: MockNetwork.Factory<*>) = copy(defaultFactory = defaultFactory)
fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization)
fun setCordappPackages(cordappPackages: List<String>) = copy(cordappPackages = cordappPackages)
}
/**
* @param notaryIdentity a set of service entries to use in place of the node's default service entries,
* for example where a node's service is part of a cluster.
* @param entropyRoot the initial entropy value to use when generating keys. Defaults to an (insecure) random value,
* but can be overridden to cause nodes to have stable or colliding identity/service keys.
* @param configOverrides add/override behaviour of the [NodeConfiguration] mock object.
*/
@Suppress("unused")
data class MockNodeParameters(
val forcedID: Int? = null,
val legalName: CordaX500Name? = null,
val notaryIdentity: Pair<ServiceInfo, KeyPair>? = null,
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
val configOverrides: (NodeConfiguration) -> Any? = {}) {
fun setForcedID(forcedID: Int?) = copy(forcedID = forcedID)
fun setLegalName(legalName: CordaX500Name?) = copy(legalName = legalName)
fun setNotaryIdentity(notaryIdentity: Pair<ServiceInfo, KeyPair>?) = copy(notaryIdentity = notaryIdentity)
fun setEntropyRoot(entropyRoot: BigInteger) = copy(entropyRoot = entropyRoot)
fun setConfigOverrides(configOverrides: (NodeConfiguration) -> Any?) = copy(configOverrides = configOverrides)
}
data class MockNodeArgs(
val config: NodeConfiguration,
val network: MockNetwork,
val id: Int,
val notaryIdentity: Pair<ServiceInfo, KeyPair>?,
val entropyRoot: BigInteger)
/**
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem or an in
@ -75,17 +120,16 @@ fun StartedNode<MockNetwork.MockNode>.pumpReceive(block: Boolean = false): InMem
*
* LogHelper.setLevel("+messages")
*/
class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
private val threadPerNode: Boolean = false,
servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy =
InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
private val defaultFactory: Factory<*> = MockNetwork.DefaultFactory,
private val initialiseSerialization: Boolean = true,
private val cordappPackages: List<String> = emptyList()) : Closeable {
companion object {
// TODO In future PR we're removing the concept of network map node so the details of this mock are not important.
val MOCK_NET_MAP = Party(CordaX500Name(organisation = "Mock Network Map", locality = "Madrid", country = "ES"), DUMMY_KEY_1.public)
}
class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParameters(),
private val networkSendManuallyPumped: Boolean = defaultParameters.networkSendManuallyPumped,
private val threadPerNode: Boolean = defaultParameters.threadPerNode,
servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy = defaultParameters.servicePeerAllocationStrategy,
private val defaultFactory: Factory<*> = defaultParameters.defaultFactory,
private val initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
private val cordappPackages: List<String> = defaultParameters.cordappPackages) : Closeable {
/** Helper constructor for creating a [MockNetwork] with custom parameters from Java. */
constructor(parameters: MockNetworkParameters) : this(defaultParameters = parameters)
var nextNodeId = 0
private set
private val filesystem = Jimfs.newFileSystem(unix())
@ -97,9 +141,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
/** A read only view of the current set of executing nodes. */
val nodes: List<MockNode> get() = _nodes
private var _networkMapNode: StartedNode<MockNode>? = null
val networkMapNode: StartedNode<MockNode> get() = _networkMapNode ?: startNetworkMapNode()
init {
if (initialiseSerialization) initialiseTestSerialization()
filesystem.getPath("/nodes").createDirectory()
@ -107,21 +148,11 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
/** Allows customisation of how nodes are created. */
interface Factory<out N : MockNode> {
/**
* @param notaryIdentity is an additional override to use in place of the node's default notary service,
* main usage is for when the node is part of a notary cluster.
* @param entropyRoot the initial entropy value to use when generating keys. Defaults to an (insecure) random value,
* but can be overriden to cause nodes to have stable or colliding identity/service keys.
*/
fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
id: Int, notaryIdentity: Pair<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): N
fun create(args: MockNodeArgs): N
}
object DefaultFactory : Factory<MockNode> {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
id: Int, notaryIdentity: Pair<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNode {
return MockNode(config, network, networkMapAddr, id, notaryIdentity, entropyRoot)
}
override fun create(args: MockNodeArgs) = MockNode(args)
}
/**
@ -147,19 +178,17 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
}
}
/**
* @param notaryIdentity is an additional override to use in place of the node's default notary service,
* main usage is for when the node is part of a notary cluster.
* @param entropyRoot the initial entropy value to use when generating keys. Defaults to an (insecure) random value,
* but can be overriden to cause nodes to have stable or colliding identity/service keys.
*/
open class MockNode(config: NodeConfiguration,
val mockNet: MockNetwork,
override val networkMapAddress: SingleMessageRecipient?,
val id: Int,
internal val notaryIdentity: Pair<ServiceInfo, KeyPair>?,
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) :
AbstractNode(config, TestClock(), MOCK_VERSION_INFO, CordappLoader.createDefaultWithTestPackages(config, mockNet.cordappPackages), mockNet.busyLatch) {
open class MockNode(args: MockNodeArgs) : AbstractNode(
args.config,
TestClock(),
MOCK_VERSION_INFO,
CordappLoader.createDefaultWithTestPackages(args.config, args.network.cordappPackages),
args.network.busyLatch) {
val mockNet = args.network
override val networkMapAddress = null
val id = args.id
internal val notaryIdentity = args.notaryIdentity
val entropyRoot = args.entropyRoot
var counter = entropyRoot
override val log: Logger = loggerFor<MockNode>()
override val serverThread: AffinityExecutor =
@ -187,24 +216,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
.getOrThrow()
}
override fun makeIdentityService(trustRoot: X509Certificate,
clientCa: CertificateAndKeyPair?,
legalIdentity: PartyAndCertificate): IdentityService {
val caCertificates: Array<X509Certificate> = listOf(legalIdentity.certificate, clientCa?.certificate?.cert)
.filterNotNull()
.toTypedArray()
val identityService = PersistentIdentityService(info.legalIdentitiesAndCerts,
trustRoot = trustRoot, caCertificates = *caCertificates)
services.networkMapCache.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } }
services.networkMapCache.changed.subscribe { mapChange ->
// TODO how should we handle network map removal
if (mapChange is NetworkMapCache.MapChange.Added) {
mapChange.node.legalIdentitiesAndCerts.forEach {
identityService.verifyAndRegisterIdentity(it)
}
}
}
return identityService
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
network = messagingServiceSpy
}
override fun makeKeyManagementService(identityService: IdentityService): KeyManagementService {
@ -262,6 +275,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
dbCloser = null
}
fun hasDBConnection() = dbCloser != null
// You can change this from zero if you have custom [FlowLogic] that park themselves. e.g. [StateMachineManagerTests]
var acceptableLiveFiberCountOnStop: Int = 0
@ -275,92 +290,40 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
throw IllegalStateException("Unable to enumerate all nodes in BFT cluster.")
}
clusterNodes.forEach {
val notaryService = it.started!!.smm.findServices { it is BFTNonValidatingNotaryService }.single() as BFTNonValidatingNotaryService
val notaryService = it.findTokenizableService(BFTNonValidatingNotaryService::class.java)!!
notaryService.waitUntilReplicaHasInitialized()
}
}
}
}
/**
* Makes sure that the [MockNode] is correctly registered on the [MockNetwork]
* Please note that [MockNetwork.runNetwork] should be invoked to ensure that all the pending registration requests
* were duly processed
*/
fun ensureRegistered() {
_nodeReadyFuture.getOrThrow()
}
}
fun <N : MockNode> startNetworkMapNode(nodeFactory: Factory<N>? = null): StartedNode<N> {
check(_networkMapNode == null) { "Trying to start more than one network map node" }
return uncheckedCast(createNodeImpl(networkMapAddress = null,
forcedID = null,
nodeFactory = nodeFactory ?: defaultFactory,
legalName = MOCK_NET_MAP.name,
notaryIdentity = null,
entropyRoot = BigInteger.valueOf(random63BitValue()),
configOverrides = {},
start = true
).started!!.apply {
_networkMapNode = this
})
}
fun createUnstartedNode(forcedID: Int? = null,
legalName: CordaX500Name? = null, notaryIdentity: Pair<ServiceInfo, KeyPair>? = null,
entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
configOverrides: (NodeConfiguration) -> Any? = {}): MockNode {
return createUnstartedNode(forcedID, defaultFactory, legalName, notaryIdentity, entropyRoot, configOverrides = configOverrides)
}
fun <N : MockNode> createUnstartedNode(forcedID: Int? = null, nodeFactory: Factory<N>,
legalName: CordaX500Name? = null, notaryIdentity: Pair<ServiceInfo, KeyPair>? = null,
entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
configOverrides: (NodeConfiguration) -> Any? = {}): N {
val networkMapAddress = networkMapNode.network.myAddress
return createNodeImpl(networkMapAddress, forcedID, nodeFactory, false, legalName, notaryIdentity, entropyRoot, configOverrides)
}
/**
* Returns a node, optionally created by the passed factory method.
* @param notaryIdentity a set of service entries to use in place of the node's default service entries,
* for example where a node's service is part of a cluster.
* @param entropyRoot the initial entropy value to use when generating keys. Defaults to an (insecure) random value,
* but can be overridden to cause nodes to have stable or colliding identity/service keys.
* @param configOverrides add/override behaviour of the [NodeConfiguration] mock object.
*/
fun createNode(forcedID: Int? = null,
legalName: CordaX500Name? = null, notaryIdentity: Pair<ServiceInfo, KeyPair>? = null,
entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
configOverrides: (NodeConfiguration) -> Any? = {}): StartedNode<MockNode> {
return createNode(forcedID, defaultFactory, legalName, notaryIdentity, entropyRoot, configOverrides = configOverrides)
fun createUnstartedNode(parameters: MockNodeParameters = MockNodeParameters()) = createUnstartedNode(parameters, defaultFactory)
fun <N : MockNode> createUnstartedNode(parameters: MockNodeParameters = MockNodeParameters(), nodeFactory: Factory<N>): N {
return createNodeImpl(parameters, nodeFactory, false)
}
fun createNode(parameters: MockNodeParameters = MockNodeParameters()): StartedNode<MockNode> = createNode(parameters, defaultFactory)
/** Like the other [createNode] but takes a [Factory] and propagates its [MockNode] subtype. */
fun <N : MockNode> createNode(forcedID: Int? = null, nodeFactory: Factory<N>,
legalName: CordaX500Name? = null, notaryIdentity: Pair<ServiceInfo, KeyPair>? = null,
entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue()),
configOverrides: (NodeConfiguration) -> Any? = {}): StartedNode<N> {
val networkMapAddress = networkMapNode.network.myAddress
return uncheckedCast(createNodeImpl(networkMapAddress, forcedID, nodeFactory, true, legalName, notaryIdentity, entropyRoot, configOverrides).started)!!
fun <N : MockNode> createNode(parameters: MockNodeParameters = MockNodeParameters(), nodeFactory: Factory<N>): StartedNode<N> {
val node: StartedNode<N> = uncheckedCast(createNodeImpl(parameters, nodeFactory, true).started)!!
ensureAllNetworkMapCachesHaveAllNodeInfos()
return node
}
private fun <N : MockNode> createNodeImpl(networkMapAddress: SingleMessageRecipient?, forcedID: Int?, nodeFactory: Factory<N>,
start: Boolean, legalName: CordaX500Name?, notaryIdentity: Pair<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger,
configOverrides: (NodeConfiguration) -> Any?): N {
val id = forcedID ?: nextNodeId++
private fun <N : MockNode> createNodeImpl(parameters: MockNodeParameters, nodeFactory: Factory<N>, start: Boolean): N {
val id = parameters.forcedID ?: nextNodeId++
val config = testNodeConfiguration(
baseDirectory = baseDirectory(id).createDirectories(),
myLegalName = legalName ?: CordaX500Name(organisation = "Mock Company $id", locality = "London", country = "GB")).also {
whenever(it.dataSourceProperties).thenReturn(makeTestDataSourceProperties("node_${id}_net_$networkId"))
configOverrides(it)
myLegalName = parameters.legalName ?: CordaX500Name(organisation = "Mock Company $id", locality = "London", country = "GB")).also {
doReturn(makeTestDataSourceProperties("node_${id}_net_$networkId")).whenever(it).dataSourceProperties
parameters.configOverrides(it)
}
return nodeFactory.create(config, this, networkMapAddress, id, notaryIdentity, entropyRoot).apply {
return nodeFactory.create(MockNodeArgs(config, this, id, parameters.notaryIdentity, parameters.entropyRoot)).apply {
if (start) {
start()
if (threadPerNode && networkMapAddress != null) nodeReadyFuture.getOrThrow() // XXX: What about manually-started nodes?
if (threadPerNode) nodeReadyFuture.getOrThrow() // XXX: What about manually-started nodes?
ensureAllNetworkMapCachesHaveAllNodeInfos()
}
_nodes.add(this)
}
@ -376,6 +339,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
*/
@JvmOverloads
fun runNetwork(rounds: Int = -1) {
ensureAllNetworkMapCachesHaveAllNodeInfos()
check(!networkSendManuallyPumped)
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
@ -391,23 +355,24 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
@JvmOverloads
fun createNotaryNode(legalName: CordaX500Name = DUMMY_NOTARY.name, validating: Boolean = true): StartedNode<MockNode> {
return createNode(legalName = legalName, configOverrides = {
whenever(it.notary).thenReturn(NotaryConfig(validating))
})
return createNode(MockNodeParameters(legalName = legalName, configOverrides = {
doReturn(NotaryConfig(validating)).whenever(it).notary
}))
}
fun <N : MockNode> createNotaryNode(legalName: CordaX500Name = DUMMY_NOTARY.name,
fun <N : MockNode> createNotaryNode(parameters: MockNodeParameters = MockNodeParameters(legalName = DUMMY_NOTARY.name),
validating: Boolean = true,
nodeFactory: Factory<N>): StartedNode<N> {
return createNode(legalName = legalName, nodeFactory = nodeFactory, configOverrides = {
whenever(it.notary).thenReturn(NotaryConfig(validating))
})
return createNode(parameters.copy(configOverrides = {
doReturn(NotaryConfig(validating)).whenever(it).notary
parameters.configOverrides(it)
}), nodeFactory)
}
@JvmOverloads
fun createPartyNode(legalName: CordaX500Name? = null,
notaryIdentity: Pair<ServiceInfo, KeyPair>? = null): StartedNode<MockNode> {
return createNode(legalName = legalName, notaryIdentity = notaryIdentity)
return createNode(MockNodeParameters(legalName = legalName, notaryIdentity = notaryIdentity))
}
@Suppress("unused") // This is used from the network visualiser tool.
@ -422,9 +387,21 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
}
}
private fun ensureAllNetworkMapCachesHaveAllNodeInfos() {
val infos = nodes.mapNotNull { it.started?.info }
nodes.filter { it.hasDBConnection() }
.mapNotNull { it.started?.services?.networkMapCache }
.forEach {
for (nodeInfo in infos) {
it.addNode(nodeInfo)
}
}
}
fun startNodes() {
require(nodes.isNotEmpty())
nodes.forEach { it.started ?: it.start() }
ensureAllNetworkMapCachesHaveAllNodeInfos()
}
fun stopNodes() {
@ -450,4 +427,16 @@ fun network(nodesCount: Int, action: MockNetwork.(nodes: List<StartedNode<MockNe
val nodes = (1..nodesCount).map { _ -> it.createPartyNode() }
action(it, nodes, notary)
}
}
/**
* Extend this class in order to intercept and modify messages passing through the [MessagingService] when using the [InMemoryNetwork].
*/
open class MessagingServiceSpy(val messagingService: MessagingService) : MessagingService by messagingService
/**
* Attach a [MessagingServiceSpy] to the [MockNode] allowing interception and modification of messages.
*/
fun StartedNode<MockNetwork.MockNode>.setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
internals.setMessagingServiceSpy(messagingServiceSpy)
}

View File

@ -30,6 +30,7 @@ import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.CordaPersistence
@ -179,7 +180,7 @@ open class MockServices(
fun makeVaultService(hibernateConfig: HibernateConfiguration): VaultServiceInternal {
val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, stateLoader, hibernateConfig)
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, hibernateConfig)
return vaultService
}

View File

@ -14,8 +14,8 @@ import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.testing.MockServiceHubInternal
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
@ -42,7 +42,7 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort
val executor = ServiceAffinityExecutor(config.myLegalName.organisation, 1)
// TODO: We should have a dummy service hub rather than change behaviour in tests
val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port,
MockNetworkMapCache(serviceHub = object : MockServiceHubInternal(database = database, configuration = config) {}), userService)
NetworkMapCacheImpl(MockNetworkMapCache(database, config), identityService), userService)
val networkMapRegistrationFuture = openFuture<Unit>()
val network = database.transaction {
NodeMessagingClient(